Coverage for biobb_common / biobb_common / generic / biobb_object.py: 49%

268 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 13:18 +0000

1"""Module containing the BiobbObject generic parent class.""" 

2import difflib 

3import importlib 

4import os 

5import shutil 

6import warnings 

7import argparse 

8from logging import Logger 

9from pathlib import Path 

10from pydoc import locate 

11from sys import platform 

12from typing import Optional, Union 

13from biobb_common.configuration import settings 

14from biobb_common.command_wrapper import cmd_wrapper 

15from biobb_common.tools import file_utils as fu 

16from biobb_common import biobb_global_properties 

17 

18 

19class BiobbObject: 

20 """ 

21 | biobb_common BiobbObject 

22 | Generic parent class for the rest of the Biobb clases. 

23 | The BiobbOject class contains all the properties and methods that are common to all the biobb blocks. 

24 

25 Args: 

26 properties (dict - Python dictionary object containing the tool parameters, not input/output files): 

27 

28 * **io_dict** (*dict*) - ({}) Input/Output files dictionary. 

29 * **container_path** (*str*) - (None) Path to the binary executable of your container. 

30 * **container_image** (*str*) - (None) Container Image identifier. 

31 * **container_volume_path** (*str*) - ("/data") Path to an internal directory in the container. 

32 * **container_working_dir** (*str*) - (None) Path to the internal CWD in the container. 

33 * **container_user_id** (*str*) - (None) User number id to be mapped inside the container. 

34 * **container_shell_path** (*str*) - ("/bin/bash -c") Path to the binary executable of the container shell. 

35 * **container_generic_command** (*str*) - ("run") Which command typically run or exec will be used to execute your image. 

36 * **stage_io_dict** (*dict*) - ({}) Stage Input/Output files dictionary. 

37 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

38 * **disable_sandbox** (*bool*) - (False) Disable the use of temporal unique directories aka sandbox. Only for local execution. 

39 * **global_properties_list** (*list*) - ([]) list of global properties. 

40 * **chdir_sandbox** (*bool*) - (False) Change directory to the sandbox using just file names in the command line. Only for local execution. 

41 * **binary_path** (*str*) - ('') Path to the binary executable. 

42 * **disable_logs** (*bool*) - (False) Disable the logs. 

43 * **global_log** (*Logger object*) - (None) Log from the main workflow. 

44 * **out_log** (*Logger object*) - (None) Log from the step. 

45 * **err_log** (*Logger object*) - (None) Error log from the step. 

46 * **out_log_path** (*str*) - (None) Path to the log file. 

47 * **err_log_path** (*str*) - (None) Path to the error log file. 

48 * **can_write_console_log** (*bool*) - (True) Can write console log. 

49 * **can_write_file_log** (*bool*) - (True) Can write file log. 

50 * **prefix** (*str*) - (None) Prefix if provided. 

51 * **step** (*str*) - (None) Name of the step. 

52 * **path** (*str*) - ('') Absolute path to the step working dir. 

53 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

54 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

55 * **cmd** (*list*) - ([]) Command line list, NOT read from the dictionary. 

56 * **return_code** (*int*) - (0) Return code of the command execution, NOT read from the dictionary. 

57 * **timeout** (*int*) - (None) Timeout for the execution of the command. 

58 * **tmp_files** (*list*) - ([]) list of temporal files, NOT read from the dictionary. 

59 * **env_vars_dict** (*dict*) - ({}) Environment Variables dictionary. 

60 * **shell_path** (*str*) - ("/bin/bash") Path to the binary executable of the shell. 

61 * **dev** (*str*) - (None) Development options. 

62 * **check_extensions** (*bool*) - (True) Check extensions of the input/output files. 

63 * **check_var_typing** (*bool*) - (True) Check typing of the input/output files. 

64 * **locals_var_dict** (*dict*) - ({}) Local variables dictionary. 

65 * **doc_arguments_dict** (*dict*) - ({}) Documentation arguments dictionary. 

66 * **doc_properties_dict** (*dict*) - ({}) Documentation properties dictionary. 

67 

68 

69 """ 

70 

71 def __init__(self, properties=None, **kwargs) -> None: # type: ignore 

72 # Merge global properties, priorizating local ones 

73 properties = biobb_global_properties.dict() | properties or {} 

74 

75 # Input/Output files 

76 self.io_dict: dict[str, dict[str, str]] = {"in": {}, "out": {}} 

77 

78 # container Specific 

79 self.container_path: Optional[str] = properties.get("container_path") 

80 self.container_image: str = properties.get("container_image", '') 

81 self.container_volume_path: str = properties.get("container_volume_path", "/data") 

82 self.container_working_dir: Optional[str] = properties.get("container_working_dir") 

83 self.container_user_id: Optional[str] = properties.get("container_user_id") 

84 self.container_shell_path: str = properties.get("container_shell_path", "/bin/bash -c") 

85 self.container_generic_command: str = properties.get("container_generic_command", "run") 

86 

87 # stage 

88 self.stage_io_dict: dict[str, dict[str, str]] = {"in": {}, "out": {}} 

89 self.sandbox_path: Union[str, Path] = properties.get("sandbox_path", Path().cwd()) 

90 self.disable_sandbox: bool = properties.get("disable_sandbox", False) 

91 

92 # Properties common in all BB 

93 self.global_properties_list: list[str] = properties.get("global_properties_list", []) 

94 self.chdir_sandbox: bool = properties.get("chdir_sandbox", False) 

95 self.binary_path: str = properties.get("binary_path", '') 

96 self.disable_logs: bool = properties.get("disable_logs", False) 

97 self.global_log: Optional[Logger] = properties.get("global_log", None) 

98 self.out_log: Optional[Logger] = None 

99 self.err_log: Optional[Logger] = None 

100 self.out_log_path: Optional[Union[Path, str]] = properties.get("out_log_path", None) 

101 self.err_log_path: Optional[Union[Path, str]] = properties.get("err_log_path", None) 

102 self.can_write_console_log: bool = properties.get("can_write_console_log", True) 

103 self.can_write_file_log: bool = properties.get("can_write_file_log", True) 

104 self.prefix: Optional[str] = properties.get("prefix", None) 

105 self.step: Optional[str] = properties.get("step", None) 

106 self.path: str = properties.get("path", "") 

107 self.remove_tmp: bool = properties.get("remove_tmp", True) 

108 self.restart: bool = properties.get("restart", False) 

109 self.cmd: list[str] = [] 

110 self.return_code: int = 0 

111 self.timeout: Optional[int] = properties.get("timeout", None) 

112 self.tmp_files: list[Union[str, Path]] = [] 

113 self.env_vars_dict: dict = properties.get("env_vars_dict", {}) 

114 self.shell_path: Union[str, Path] = properties.get("shell_path", os.getenv("SHELL", "/bin/bash")) 

115 self.dev: Optional[str] = properties.get("dev", None) 

116 self.check_extensions: bool = properties.get("check_extensions", True) 

117 self.check_var_typing: bool = properties.get("check_var_typing", True) 

118 self.locals_var_dict: dict[str, str] = dict() 

119 self.doc_arguments_dict, self.doc_properties_dict = fu.get_doc_dicts(self.__doc__) 

120 

121 try: 

122 self.version = importlib.import_module( 

123 self.__module__.split(".")[0] 

124 ).__version__ 

125 except Exception: 

126 self.version = None 

127 

128 def check_arguments( 

129 self, 

130 output_files_created: bool = False, 

131 raise_exception: bool = True 

132 ): 

133 for argument, argument_dict in self.doc_arguments_dict.items(): 

134 fu.check_argument( 

135 path=Path(self.locals_var_dict[argument]) 

136 if self.locals_var_dict.get(argument) 

137 else None, 

138 argument=argument, 

139 optional=argument_dict.get("optional", False), 

140 module_name=self.__module__, 

141 input_output=argument_dict.get( 

142 "input_output", "").lower().strip(), 

143 output_files_created=output_files_created, 

144 type=argument_dict.get("type", None), 

145 extension_list=list(argument_dict.get("formats")), 

146 check_extensions=self.check_extensions, 

147 raise_exception=raise_exception, 

148 out_log=self.out_log, 

149 ) 

150 if output_files_created: 

151 fu.log("", self.out_log, self.global_log) 

152 

153 def check_properties( 

154 self, 

155 properties: dict, 

156 reserved_properties: Optional[set[str]] = None, 

157 check_var_typing: bool = False, 

158 ): 

159 if not reserved_properties: 

160 reserved_properties = set() 

161 reserved_properties = {"system", "working_dir_path", "tool"}.union(reserved_properties) 

162 reserved_properties = reserved_properties.union(set(self.global_properties_list)) 

163 error_properties = set([prop for prop in properties.keys() if prop not in self.__dict__.keys()]) 

164 

165 # Check types 

166 if check_var_typing and self.doc_properties_dict: 

167 for prop, value in properties.items(): 

168 if self.doc_properties_dict.get(prop): 

169 property_type = self.doc_properties_dict[prop].get("type") 

170 classinfo: object = locate(property_type).__class__ 

171 if classinfo == type: 

172 classinfo = locate(property_type) 

173 if not isinstance(value, classinfo): # type: ignore 

174 warnings.warn( 

175 f"Warning: {prop} property type not recognized. Got {type(value)} Expected {locate(property_type)}" 

176 ) 

177 

178 error_properties = set( 

179 [prop for prop in properties.keys() if prop not in self.__dict__.keys()] 

180 ) 

181 error_properties -= reserved_properties 

182 for error_property in error_properties: 

183 close_property = difflib.get_close_matches( 

184 error_property, self.__dict__.keys(), n=1, cutoff=0.01 

185 ) 

186 close_property = close_property[0] if close_property else "" # type: ignore 

187 warnings.warn( 

188 "Warning: %s is not a recognized property. The most similar property is: %s" 

189 % (error_property, close_property) 

190 ) 

191 

192 def check_init(self, properties): 

193 """Check that the arguments and properties passed have the correct types and formats.""" 

194 self.check_properties(properties) 

195 self.check_arguments() 

196 

197 def check_restart(self) -> bool: 

198 if self.version: 

199 fu.log( 

200 f"Module: {self.__module__} Version: {self.version}", 

201 self.out_log, self.global_log 

202 ) 

203 

204 if self.restart: 

205 if fu.check_complete_files(self.io_dict["out"].values()): # type: ignore 

206 fu.log("Restart is enabled, this step: %s will the skipped" % self.step, self.out_log, self.global_log) 

207 return True 

208 return False 

209 

210 def stage_files(self): 

211 """Stage the input/output files in a temporal unique directory aka sandbox.""" 

212 if self.disable_sandbox: 

213 self.stage_io_dict = self.io_dict.copy() 

214 # If we are not using a sandbox, we use the current working directory as the unique directory 

215 self.stage_io_dict["unique_dir"] = os.getcwd() 

216 return 

217 # Create a unique directory for the sandbox 

218 unique_dir = str(Path(fu.create_unique_dir(path=str(self.sandbox_path), prefix="sandbox_", out_log=self.out_log)).resolve()) 

219 self.stage_io_dict = {"in": {}, "out": {}, "unique_dir": unique_dir} 

220 

221 # Only remove unique_dir if using sandbox 

222 self.tmp_files.append(unique_dir) 

223 

224 for io in ["in", "out"]: 

225 for file_ref, file_path in self.io_dict.get(io, {}).items(): 

226 if not file_path: 

227 # Skip optional files not set 

228 continue 

229 file_path = Path(file_path) 

230 # Assign INTERNAL PATH to IN/OUT files 

231 if file_path.exists() or io == "out": 

232 if io == "in": 

233 fu.log(f"Copy to stage: {file_path} --> {unique_dir.split('/')[-1]}", self.out_log) 

234 doc = self.doc_arguments_dict.get(file_ref) 

235 if doc and doc['type'] == 'dir' and file_path.suffix != '.zip': 

236 shutil.copytree(file_path, os.path.join(unique_dir, file_path.name)) 

237 else: 

238 shutil.copy2(file_path, unique_dir) 

239 # Container 

240 if self.container_path: 

241 self.stage_io_dict[io][file_ref] = os.path.join(self.container_volume_path, file_path.name) 

242 # Local 

243 else: 

244 self.stage_io_dict[io][file_ref] = os.path.join(unique_dir, file_path.name) 

245 if self.chdir_sandbox: 

246 self.stage_io_dict[io][file_ref] = file_path.name 

247 elif io == "in": 

248 # Default IN files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro) 

249 self.stage_io_dict[io][file_ref] = file_path.name 

250 

251 def create_cmd_line(self) -> None: 

252 """ The method modifies the `self.cmd` attribute in-place to contain the final 

253 command line that will be executed based on the container type. """ 

254 # Not documented and not listed option, only for devs 

255 if self.dev: 

256 fu.log(f"Adding development options: {self.dev}", 

257 self.out_log, self.global_log) 

258 self.cmd += self.dev.split() 

259 

260 # Containers 

261 host_volume: str = str(self.stage_io_dict.get("unique_dir", '')) 

262 self.container_path = self.container_path or "" 

263 # Singularity 

264 if self.container_path.endswith("singularity"): 

265 fu.log( 

266 "Using Singularity image %s" % self.container_image, 

267 self.out_log, 

268 self.global_log, 

269 ) 

270 if not Path(self.container_image).exists(): 

271 fu.log( 

272 f"{self.container_image} does not exist trying to pull it", 

273 self.out_log, 

274 self.global_log, 

275 ) 

276 container_image_name = str( 

277 Path(self.container_image).with_suffix(".sif").name 

278 ) 

279 singularity_pull_cmd = [ 

280 self.container_path, 

281 "pull", 

282 "--name", 

283 container_image_name, 

284 self.container_image, 

285 ] 

286 try: 

287 from biobb_common.command_wrapper import cmd_wrapper 

288 

289 cmd_wrapper.CmdWrapper( 

290 singularity_pull_cmd, self.shell_path, self.out_log 

291 ).launch() 

292 if Path(container_image_name).exists(): 

293 self.container_image = container_image_name 

294 else: 

295 raise FileNotFoundError 

296 except FileNotFoundError: 

297 fu.log( 

298 f"{' '.join(singularity_pull_cmd)} not found", 

299 self.out_log, 

300 self.global_log, 

301 ) 

302 raise FileNotFoundError 

303 singularity_cmd = [ 

304 self.container_path, 

305 self.container_generic_command, 

306 "-e", 

307 ] 

308 

309 if self.env_vars_dict: 

310 singularity_cmd.append("--env") 

311 singularity_cmd.append( 

312 ",".join( 

313 f"{env_var_name}='{env_var_value}'" 

314 for env_var_name, env_var_value in self.env_vars_dict.items() 

315 ) 

316 ) 

317 

318 singularity_cmd.extend( 

319 [ 

320 "--bind", 

321 host_volume + ":" + self.container_volume_path, 

322 self.container_image, 

323 ] 

324 ) 

325 

326 # If we are working on a mac remove -e option because is still no available 

327 if platform == "darwin": 

328 if "-e" in singularity_cmd: 

329 singularity_cmd.remove("-e") 

330 

331 if not self.cmd and not self.container_shell_path: 

332 fu.log( 

333 "WARNING: The command-line is empty your container should know what to do automatically.", 

334 self.out_log, 

335 self.global_log, 

336 ) 

337 else: 

338 cmd = ['"' + " ".join(self.cmd) + '"'] 

339 singularity_cmd.append(self.container_shell_path) 

340 singularity_cmd.extend(cmd) 

341 self.cmd = singularity_cmd 

342 # Docker 

343 elif self.container_path.endswith("docker"): 

344 fu.log("Using Docker image %s" % self.container_image, 

345 self.out_log, self.global_log) 

346 docker_cmd = [self.container_path, self.container_generic_command] 

347 if self.env_vars_dict: 

348 for env_var_name, env_var_value in self.env_vars_dict.items(): 

349 docker_cmd.append("-e") 

350 docker_cmd.append(f"{env_var_name}='{env_var_value}'") 

351 if self.container_working_dir: 

352 docker_cmd.append("-w") 

353 docker_cmd.append(self.container_working_dir) 

354 if self.container_volume_path: 

355 docker_cmd.append("-v") 

356 docker_cmd.append(host_volume + ":" + self.container_volume_path) 

357 if self.container_user_id: 

358 docker_cmd.append("--user") 

359 docker_cmd.append(self.container_user_id) 

360 

361 docker_cmd.append(self.container_image) 

362 

363 if not self.cmd and not self.container_shell_path: 

364 fu.log( 

365 "WARNING: The command-line is empty your container should know what to do automatically.", 

366 self.out_log, self.global_log 

367 ) 

368 else: 

369 cmd = ['"' + " ".join(self.cmd) + '"'] 

370 docker_cmd.append(self.container_shell_path) 

371 docker_cmd.extend(cmd) 

372 self.cmd = docker_cmd 

373 # Pcocc 

374 elif self.container_path.endswith("pcocc"): 

375 # pcocc run -I racov56:pmx cli.py mutate -h 

376 fu.log( 

377 "Using pcocc image %s" % self.container_image, 

378 self.out_log, 

379 self.global_log, 

380 ) 

381 pcocc_cmd = [ 

382 self.container_path, 

383 self.container_generic_command, 

384 "-I", 

385 self.container_image, 

386 ] 

387 if self.container_working_dir: 

388 pcocc_cmd.append("--cwd") 

389 pcocc_cmd.append(self.container_working_dir) 

390 if self.container_volume_path: 

391 pcocc_cmd.append("--mount") 

392 pcocc_cmd.append(host_volume + ":" + self.container_volume_path) 

393 if self.container_user_id: 

394 pcocc_cmd.append("--user") 

395 pcocc_cmd.append(self.container_user_id) 

396 

397 if not self.cmd and not self.container_shell_path: 

398 fu.log( 

399 "WARNING: The command-line is empty your container should know what to do automatically.", 

400 self.out_log, 

401 self.global_log, 

402 ) 

403 else: 

404 cmd = ['\\"' + " ".join(self.cmd) + '\\"'] 

405 pcocc_cmd.append(self.container_shell_path) 

406 pcocc_cmd.extend(cmd) 

407 self.cmd = pcocc_cmd 

408 # Local execution 

409 else: 

410 pass 

411 # fu.log('Not using any container', self.out_log, self.global_log) 

412 

413 def execute_command(self): 

414 

415 cwd = os.getcwd() 

416 if self.chdir_sandbox: 

417 os.chdir(self.stage_io_dict["unique_dir"]) 

418 

419 self.return_code = cmd_wrapper.CmdWrapper( 

420 cmd=self.cmd, 

421 shell_path=self.shell_path, 

422 out_log=self.out_log, 

423 err_log=self.err_log, 

424 global_log=self.global_log, 

425 env=self.env_vars_dict, 

426 timeout=self.timeout, 

427 disable_logs=self.disable_logs 

428 ).launch() 

429 

430 if self.chdir_sandbox: 

431 os.chdir(cwd) 

432 

433 def run_biobb(self): 

434 self.create_cmd_line() 

435 self.execute_command() 

436 

437 def copy_to_host(self): 

438 """Copy output files from the sandbox to the host system.""" 

439 for file_ref, file_path in self.stage_io_dict["out"].items(): 

440 dest_path = Path(self.io_dict["out"][file_ref]) 

441 

442 # For directories, we need to ensure the directory exists in the sandbox 

443 if self.doc_arguments_dict[file_ref]['type'] == 'dir': 

444 # If the output is a directory, ensure it exists in the sandbox 

445 sandbox_dir_path = Path(self.stage_io_dict["unique_dir"]).joinpath(file_path) 

446 fu.log(f"Copy directory to host: {sandbox_dir_path} --> {dest_path}", self.out_log, self.global_log) 

447 fu.copytree_new_files_only(sandbox_dir_path, dest_path) 

448 else: 

449 if not file_path: 

450 continue 

451 sandbox_file_path = Path(self.stage_io_dict["unique_dir"]).joinpath(Path(file_path).name) 

452 # Ensure file exists in the sandbox 

453 if not sandbox_file_path.exists(): 

454 continue 

455 # Only copy if destination doesn't exist or is different from source 

456 if not dest_path.exists() or not sandbox_file_path.samefile(dest_path): 

457 shutil.copy2(sandbox_file_path, dest_path) 

458 

459 def create_tmp_file(self, extension: str) -> None: 

460 """Create a temporary file in the unique directory. These files are 

461 removed when self.remove_tmp_files is called.""" 

462 tmp_file = fu.create_unique_file_path(self.stage_io_dict["unique_dir"], extension) 

463 self.tmp_files.append(tmp_file) 

464 return tmp_file 

465 

466 def create_tmp_dir(self) -> None: 

467 """Create a temporary directory in the unique directory. These directories are 

468 removed when self.remove_tmp_files is called.""" 

469 tmp_dir = fu.create_unique_dir(self.stage_io_dict["unique_dir"], "tmpdir_", self.out_log) 

470 self.tmp_files.append(tmp_dir) 

471 return tmp_dir 

472 

473 def remove_tmp_files(self): 

474 # Make sure current directory is not in the tmp_files list 

475 if str(os.getcwd()) in self.tmp_files: 

476 self.tmp_files.remove(str(os.getcwd())) 

477 

478 if self.remove_tmp: 

479 fu.rm_file_list(self.tmp_files, self.out_log) 

480 

481 @classmethod 

482 def get_main(cls, launcher, description, custom_flags=None): 

483 """Get command line execution of this building block. Please check the command line documentation.""" 

484 def main(): 

485 # Get the arguments and properties from the class docstring 

486 doc_arguments_dict, _ = fu.get_doc_dicts(cls.__doc__) 

487 # Create the argument parser 

488 parser = argparse.ArgumentParser(description=description, 

489 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

490 parser.add_argument( 

491 "-c", "--config", required=False, 

492 help="This file can be a YAML file, JSON file or JSON string", 

493 ) 

494 required_args = parser.add_argument_group("required arguments") 

495 optional_args = parser.add_argument_group("optional arguments") 

496 # Use the doc_arguments_dict to add arguments to the parser 

497 # If we have only one input or output argument, we can use shorthand flags -i/-o 

498 input_args = [arg for arg, arg_dict in doc_arguments_dict.items() if arg_dict.get("input_output", "").lower().startswith("input")] 

499 output_args = [arg for arg, arg_dict in doc_arguments_dict.items() if arg_dict.get("input_output", "").lower().startswith("output")] 

500 

501 for argument, argument_dict in doc_arguments_dict.items(): 

502 # Determine if we should add shorthand flags 

503 shorthand_flags = [f'--{argument}'] 

504 

505 # Check if custom flags are provided for this argument 

506 if custom_flags and argument in custom_flags: 

507 shorthand_flags.insert(0, custom_flags[argument]) 

508 elif len(input_args) == 1 and argument in input_args: 

509 shorthand_flags.insert(0, '-i') 

510 elif len(output_args) == 1 and argument in output_args: 

511 shorthand_flags.insert(0, '-o') 

512 help_str = argument_dict.get("description", "") + f". Accepted formats: {', '.join(argument_dict.get('formats', {}).keys())}." 

513 if argument_dict["optional"]: 

514 optional_args.add_argument(*shorthand_flags, required=False, help=help_str) 

515 else: 

516 required_args.add_argument(*shorthand_flags, required=True, help=help_str) 

517 # Parse the arguments from the command line 

518 args = parser.parse_args() 

519 args.config = args.config or "{}" 

520 # Get the properties from the configuration yaml 

521 properties = settings.ConfReader(config=args.config).get_prop_dic() 

522 args_dict = vars(args) 

523 args_dict.pop('config', None) 

524 # Remove keys with None values from args_dict 

525 args_dict = {k: v for k, v in args_dict.items() if v is not None} 

526 # Return the function without executing it 

527 launcher(**args_dict, properties=properties) 

528 return main