Coverage for biobb_common / biobb_common / generic / biobb_object.py: 49%
268 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 13:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 13:18 +0000
1"""Module containing the BiobbObject generic parent class."""
2import difflib
3import importlib
4import os
5import shutil
6import warnings
7import argparse
8from logging import Logger
9from pathlib import Path
10from pydoc import locate
11from sys import platform
12from typing import Optional, Union
13from biobb_common.configuration import settings
14from biobb_common.command_wrapper import cmd_wrapper
15from biobb_common.tools import file_utils as fu
16from biobb_common import biobb_global_properties
19class BiobbObject:
20 """
21 | biobb_common BiobbObject
22 | Generic parent class for the rest of the Biobb clases.
23 | The BiobbOject class contains all the properties and methods that are common to all the biobb blocks.
25 Args:
26 properties (dict - Python dictionary object containing the tool parameters, not input/output files):
28 * **io_dict** (*dict*) - ({}) Input/Output files dictionary.
29 * **container_path** (*str*) - (None) Path to the binary executable of your container.
30 * **container_image** (*str*) - (None) Container Image identifier.
31 * **container_volume_path** (*str*) - ("/data") Path to an internal directory in the container.
32 * **container_working_dir** (*str*) - (None) Path to the internal CWD in the container.
33 * **container_user_id** (*str*) - (None) User number id to be mapped inside the container.
34 * **container_shell_path** (*str*) - ("/bin/bash -c") Path to the binary executable of the container shell.
35 * **container_generic_command** (*str*) - ("run") Which command typically run or exec will be used to execute your image.
36 * **stage_io_dict** (*dict*) - ({}) Stage Input/Output files dictionary.
37 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
38 * **disable_sandbox** (*bool*) - (False) Disable the use of temporal unique directories aka sandbox. Only for local execution.
39 * **global_properties_list** (*list*) - ([]) list of global properties.
40 * **chdir_sandbox** (*bool*) - (False) Change directory to the sandbox using just file names in the command line. Only for local execution.
41 * **binary_path** (*str*) - ('') Path to the binary executable.
42 * **disable_logs** (*bool*) - (False) Disable the logs.
43 * **global_log** (*Logger object*) - (None) Log from the main workflow.
44 * **out_log** (*Logger object*) - (None) Log from the step.
45 * **err_log** (*Logger object*) - (None) Error log from the step.
46 * **out_log_path** (*str*) - (None) Path to the log file.
47 * **err_log_path** (*str*) - (None) Path to the error log file.
48 * **can_write_console_log** (*bool*) - (True) Can write console log.
49 * **can_write_file_log** (*bool*) - (True) Can write file log.
50 * **prefix** (*str*) - (None) Prefix if provided.
51 * **step** (*str*) - (None) Name of the step.
52 * **path** (*str*) - ('') Absolute path to the step working dir.
53 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
54 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
55 * **cmd** (*list*) - ([]) Command line list, NOT read from the dictionary.
56 * **return_code** (*int*) - (0) Return code of the command execution, NOT read from the dictionary.
57 * **timeout** (*int*) - (None) Timeout for the execution of the command.
58 * **tmp_files** (*list*) - ([]) list of temporal files, NOT read from the dictionary.
59 * **env_vars_dict** (*dict*) - ({}) Environment Variables dictionary.
60 * **shell_path** (*str*) - ("/bin/bash") Path to the binary executable of the shell.
61 * **dev** (*str*) - (None) Development options.
62 * **check_extensions** (*bool*) - (True) Check extensions of the input/output files.
63 * **check_var_typing** (*bool*) - (True) Check typing of the input/output files.
64 * **locals_var_dict** (*dict*) - ({}) Local variables dictionary.
65 * **doc_arguments_dict** (*dict*) - ({}) Documentation arguments dictionary.
66 * **doc_properties_dict** (*dict*) - ({}) Documentation properties dictionary.
69 """
71 def __init__(self, properties=None, **kwargs) -> None: # type: ignore
72 # Merge global properties, priorizating local ones
73 properties = biobb_global_properties.dict() | properties or {}
75 # Input/Output files
76 self.io_dict: dict[str, dict[str, str]] = {"in": {}, "out": {}}
78 # container Specific
79 self.container_path: Optional[str] = properties.get("container_path")
80 self.container_image: str = properties.get("container_image", '')
81 self.container_volume_path: str = properties.get("container_volume_path", "/data")
82 self.container_working_dir: Optional[str] = properties.get("container_working_dir")
83 self.container_user_id: Optional[str] = properties.get("container_user_id")
84 self.container_shell_path: str = properties.get("container_shell_path", "/bin/bash -c")
85 self.container_generic_command: str = properties.get("container_generic_command", "run")
87 # stage
88 self.stage_io_dict: dict[str, dict[str, str]] = {"in": {}, "out": {}}
89 self.sandbox_path: Union[str, Path] = properties.get("sandbox_path", Path().cwd())
90 self.disable_sandbox: bool = properties.get("disable_sandbox", False)
92 # Properties common in all BB
93 self.global_properties_list: list[str] = properties.get("global_properties_list", [])
94 self.chdir_sandbox: bool = properties.get("chdir_sandbox", False)
95 self.binary_path: str = properties.get("binary_path", '')
96 self.disable_logs: bool = properties.get("disable_logs", False)
97 self.global_log: Optional[Logger] = properties.get("global_log", None)
98 self.out_log: Optional[Logger] = None
99 self.err_log: Optional[Logger] = None
100 self.out_log_path: Optional[Union[Path, str]] = properties.get("out_log_path", None)
101 self.err_log_path: Optional[Union[Path, str]] = properties.get("err_log_path", None)
102 self.can_write_console_log: bool = properties.get("can_write_console_log", True)
103 self.can_write_file_log: bool = properties.get("can_write_file_log", True)
104 self.prefix: Optional[str] = properties.get("prefix", None)
105 self.step: Optional[str] = properties.get("step", None)
106 self.path: str = properties.get("path", "")
107 self.remove_tmp: bool = properties.get("remove_tmp", True)
108 self.restart: bool = properties.get("restart", False)
109 self.cmd: list[str] = []
110 self.return_code: int = 0
111 self.timeout: Optional[int] = properties.get("timeout", None)
112 self.tmp_files: list[Union[str, Path]] = []
113 self.env_vars_dict: dict = properties.get("env_vars_dict", {})
114 self.shell_path: Union[str, Path] = properties.get("shell_path", os.getenv("SHELL", "/bin/bash"))
115 self.dev: Optional[str] = properties.get("dev", None)
116 self.check_extensions: bool = properties.get("check_extensions", True)
117 self.check_var_typing: bool = properties.get("check_var_typing", True)
118 self.locals_var_dict: dict[str, str] = dict()
119 self.doc_arguments_dict, self.doc_properties_dict = fu.get_doc_dicts(self.__doc__)
121 try:
122 self.version = importlib.import_module(
123 self.__module__.split(".")[0]
124 ).__version__
125 except Exception:
126 self.version = None
128 def check_arguments(
129 self,
130 output_files_created: bool = False,
131 raise_exception: bool = True
132 ):
133 for argument, argument_dict in self.doc_arguments_dict.items():
134 fu.check_argument(
135 path=Path(self.locals_var_dict[argument])
136 if self.locals_var_dict.get(argument)
137 else None,
138 argument=argument,
139 optional=argument_dict.get("optional", False),
140 module_name=self.__module__,
141 input_output=argument_dict.get(
142 "input_output", "").lower().strip(),
143 output_files_created=output_files_created,
144 type=argument_dict.get("type", None),
145 extension_list=list(argument_dict.get("formats")),
146 check_extensions=self.check_extensions,
147 raise_exception=raise_exception,
148 out_log=self.out_log,
149 )
150 if output_files_created:
151 fu.log("", self.out_log, self.global_log)
153 def check_properties(
154 self,
155 properties: dict,
156 reserved_properties: Optional[set[str]] = None,
157 check_var_typing: bool = False,
158 ):
159 if not reserved_properties:
160 reserved_properties = set()
161 reserved_properties = {"system", "working_dir_path", "tool"}.union(reserved_properties)
162 reserved_properties = reserved_properties.union(set(self.global_properties_list))
163 error_properties = set([prop for prop in properties.keys() if prop not in self.__dict__.keys()])
165 # Check types
166 if check_var_typing and self.doc_properties_dict:
167 for prop, value in properties.items():
168 if self.doc_properties_dict.get(prop):
169 property_type = self.doc_properties_dict[prop].get("type")
170 classinfo: object = locate(property_type).__class__
171 if classinfo == type:
172 classinfo = locate(property_type)
173 if not isinstance(value, classinfo): # type: ignore
174 warnings.warn(
175 f"Warning: {prop} property type not recognized. Got {type(value)} Expected {locate(property_type)}"
176 )
178 error_properties = set(
179 [prop for prop in properties.keys() if prop not in self.__dict__.keys()]
180 )
181 error_properties -= reserved_properties
182 for error_property in error_properties:
183 close_property = difflib.get_close_matches(
184 error_property, self.__dict__.keys(), n=1, cutoff=0.01
185 )
186 close_property = close_property[0] if close_property else "" # type: ignore
187 warnings.warn(
188 "Warning: %s is not a recognized property. The most similar property is: %s"
189 % (error_property, close_property)
190 )
192 def check_init(self, properties):
193 """Check that the arguments and properties passed have the correct types and formats."""
194 self.check_properties(properties)
195 self.check_arguments()
197 def check_restart(self) -> bool:
198 if self.version:
199 fu.log(
200 f"Module: {self.__module__} Version: {self.version}",
201 self.out_log, self.global_log
202 )
204 if self.restart:
205 if fu.check_complete_files(self.io_dict["out"].values()): # type: ignore
206 fu.log("Restart is enabled, this step: %s will the skipped" % self.step, self.out_log, self.global_log)
207 return True
208 return False
210 def stage_files(self):
211 """Stage the input/output files in a temporal unique directory aka sandbox."""
212 if self.disable_sandbox:
213 self.stage_io_dict = self.io_dict.copy()
214 # If we are not using a sandbox, we use the current working directory as the unique directory
215 self.stage_io_dict["unique_dir"] = os.getcwd()
216 return
217 # Create a unique directory for the sandbox
218 unique_dir = str(Path(fu.create_unique_dir(path=str(self.sandbox_path), prefix="sandbox_", out_log=self.out_log)).resolve())
219 self.stage_io_dict = {"in": {}, "out": {}, "unique_dir": unique_dir}
221 # Only remove unique_dir if using sandbox
222 self.tmp_files.append(unique_dir)
224 for io in ["in", "out"]:
225 for file_ref, file_path in self.io_dict.get(io, {}).items():
226 if not file_path:
227 # Skip optional files not set
228 continue
229 file_path = Path(file_path)
230 # Assign INTERNAL PATH to IN/OUT files
231 if file_path.exists() or io == "out":
232 if io == "in":
233 fu.log(f"Copy to stage: {file_path} --> {unique_dir.split('/')[-1]}", self.out_log)
234 doc = self.doc_arguments_dict.get(file_ref)
235 if doc and doc['type'] == 'dir' and file_path.suffix != '.zip':
236 shutil.copytree(file_path, os.path.join(unique_dir, file_path.name))
237 else:
238 shutil.copy2(file_path, unique_dir)
239 # Container
240 if self.container_path:
241 self.stage_io_dict[io][file_ref] = os.path.join(self.container_volume_path, file_path.name)
242 # Local
243 else:
244 self.stage_io_dict[io][file_ref] = os.path.join(unique_dir, file_path.name)
245 if self.chdir_sandbox:
246 self.stage_io_dict[io][file_ref] = file_path.name
247 elif io == "in":
248 # Default IN files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro)
249 self.stage_io_dict[io][file_ref] = file_path.name
251 def create_cmd_line(self) -> None:
252 """ The method modifies the `self.cmd` attribute in-place to contain the final
253 command line that will be executed based on the container type. """
254 # Not documented and not listed option, only for devs
255 if self.dev:
256 fu.log(f"Adding development options: {self.dev}",
257 self.out_log, self.global_log)
258 self.cmd += self.dev.split()
260 # Containers
261 host_volume: str = str(self.stage_io_dict.get("unique_dir", ''))
262 self.container_path = self.container_path or ""
263 # Singularity
264 if self.container_path.endswith("singularity"):
265 fu.log(
266 "Using Singularity image %s" % self.container_image,
267 self.out_log,
268 self.global_log,
269 )
270 if not Path(self.container_image).exists():
271 fu.log(
272 f"{self.container_image} does not exist trying to pull it",
273 self.out_log,
274 self.global_log,
275 )
276 container_image_name = str(
277 Path(self.container_image).with_suffix(".sif").name
278 )
279 singularity_pull_cmd = [
280 self.container_path,
281 "pull",
282 "--name",
283 container_image_name,
284 self.container_image,
285 ]
286 try:
287 from biobb_common.command_wrapper import cmd_wrapper
289 cmd_wrapper.CmdWrapper(
290 singularity_pull_cmd, self.shell_path, self.out_log
291 ).launch()
292 if Path(container_image_name).exists():
293 self.container_image = container_image_name
294 else:
295 raise FileNotFoundError
296 except FileNotFoundError:
297 fu.log(
298 f"{' '.join(singularity_pull_cmd)} not found",
299 self.out_log,
300 self.global_log,
301 )
302 raise FileNotFoundError
303 singularity_cmd = [
304 self.container_path,
305 self.container_generic_command,
306 "-e",
307 ]
309 if self.env_vars_dict:
310 singularity_cmd.append("--env")
311 singularity_cmd.append(
312 ",".join(
313 f"{env_var_name}='{env_var_value}'"
314 for env_var_name, env_var_value in self.env_vars_dict.items()
315 )
316 )
318 singularity_cmd.extend(
319 [
320 "--bind",
321 host_volume + ":" + self.container_volume_path,
322 self.container_image,
323 ]
324 )
326 # If we are working on a mac remove -e option because is still no available
327 if platform == "darwin":
328 if "-e" in singularity_cmd:
329 singularity_cmd.remove("-e")
331 if not self.cmd and not self.container_shell_path:
332 fu.log(
333 "WARNING: The command-line is empty your container should know what to do automatically.",
334 self.out_log,
335 self.global_log,
336 )
337 else:
338 cmd = ['"' + " ".join(self.cmd) + '"']
339 singularity_cmd.append(self.container_shell_path)
340 singularity_cmd.extend(cmd)
341 self.cmd = singularity_cmd
342 # Docker
343 elif self.container_path.endswith("docker"):
344 fu.log("Using Docker image %s" % self.container_image,
345 self.out_log, self.global_log)
346 docker_cmd = [self.container_path, self.container_generic_command]
347 if self.env_vars_dict:
348 for env_var_name, env_var_value in self.env_vars_dict.items():
349 docker_cmd.append("-e")
350 docker_cmd.append(f"{env_var_name}='{env_var_value}'")
351 if self.container_working_dir:
352 docker_cmd.append("-w")
353 docker_cmd.append(self.container_working_dir)
354 if self.container_volume_path:
355 docker_cmd.append("-v")
356 docker_cmd.append(host_volume + ":" + self.container_volume_path)
357 if self.container_user_id:
358 docker_cmd.append("--user")
359 docker_cmd.append(self.container_user_id)
361 docker_cmd.append(self.container_image)
363 if not self.cmd and not self.container_shell_path:
364 fu.log(
365 "WARNING: The command-line is empty your container should know what to do automatically.",
366 self.out_log, self.global_log
367 )
368 else:
369 cmd = ['"' + " ".join(self.cmd) + '"']
370 docker_cmd.append(self.container_shell_path)
371 docker_cmd.extend(cmd)
372 self.cmd = docker_cmd
373 # Pcocc
374 elif self.container_path.endswith("pcocc"):
375 # pcocc run -I racov56:pmx cli.py mutate -h
376 fu.log(
377 "Using pcocc image %s" % self.container_image,
378 self.out_log,
379 self.global_log,
380 )
381 pcocc_cmd = [
382 self.container_path,
383 self.container_generic_command,
384 "-I",
385 self.container_image,
386 ]
387 if self.container_working_dir:
388 pcocc_cmd.append("--cwd")
389 pcocc_cmd.append(self.container_working_dir)
390 if self.container_volume_path:
391 pcocc_cmd.append("--mount")
392 pcocc_cmd.append(host_volume + ":" + self.container_volume_path)
393 if self.container_user_id:
394 pcocc_cmd.append("--user")
395 pcocc_cmd.append(self.container_user_id)
397 if not self.cmd and not self.container_shell_path:
398 fu.log(
399 "WARNING: The command-line is empty your container should know what to do automatically.",
400 self.out_log,
401 self.global_log,
402 )
403 else:
404 cmd = ['\\"' + " ".join(self.cmd) + '\\"']
405 pcocc_cmd.append(self.container_shell_path)
406 pcocc_cmd.extend(cmd)
407 self.cmd = pcocc_cmd
408 # Local execution
409 else:
410 pass
411 # fu.log('Not using any container', self.out_log, self.global_log)
413 def execute_command(self):
415 cwd = os.getcwd()
416 if self.chdir_sandbox:
417 os.chdir(self.stage_io_dict["unique_dir"])
419 self.return_code = cmd_wrapper.CmdWrapper(
420 cmd=self.cmd,
421 shell_path=self.shell_path,
422 out_log=self.out_log,
423 err_log=self.err_log,
424 global_log=self.global_log,
425 env=self.env_vars_dict,
426 timeout=self.timeout,
427 disable_logs=self.disable_logs
428 ).launch()
430 if self.chdir_sandbox:
431 os.chdir(cwd)
433 def run_biobb(self):
434 self.create_cmd_line()
435 self.execute_command()
437 def copy_to_host(self):
438 """Copy output files from the sandbox to the host system."""
439 for file_ref, file_path in self.stage_io_dict["out"].items():
440 dest_path = Path(self.io_dict["out"][file_ref])
442 # For directories, we need to ensure the directory exists in the sandbox
443 if self.doc_arguments_dict[file_ref]['type'] == 'dir':
444 # If the output is a directory, ensure it exists in the sandbox
445 sandbox_dir_path = Path(self.stage_io_dict["unique_dir"]).joinpath(file_path)
446 fu.log(f"Copy directory to host: {sandbox_dir_path} --> {dest_path}", self.out_log, self.global_log)
447 fu.copytree_new_files_only(sandbox_dir_path, dest_path)
448 else:
449 if not file_path:
450 continue
451 sandbox_file_path = Path(self.stage_io_dict["unique_dir"]).joinpath(Path(file_path).name)
452 # Ensure file exists in the sandbox
453 if not sandbox_file_path.exists():
454 continue
455 # Only copy if destination doesn't exist or is different from source
456 if not dest_path.exists() or not sandbox_file_path.samefile(dest_path):
457 shutil.copy2(sandbox_file_path, dest_path)
459 def create_tmp_file(self, extension: str) -> None:
460 """Create a temporary file in the unique directory. These files are
461 removed when self.remove_tmp_files is called."""
462 tmp_file = fu.create_unique_file_path(self.stage_io_dict["unique_dir"], extension)
463 self.tmp_files.append(tmp_file)
464 return tmp_file
466 def create_tmp_dir(self) -> None:
467 """Create a temporary directory in the unique directory. These directories are
468 removed when self.remove_tmp_files is called."""
469 tmp_dir = fu.create_unique_dir(self.stage_io_dict["unique_dir"], "tmpdir_", self.out_log)
470 self.tmp_files.append(tmp_dir)
471 return tmp_dir
473 def remove_tmp_files(self):
474 # Make sure current directory is not in the tmp_files list
475 if str(os.getcwd()) in self.tmp_files:
476 self.tmp_files.remove(str(os.getcwd()))
478 if self.remove_tmp:
479 fu.rm_file_list(self.tmp_files, self.out_log)
481 @classmethod
482 def get_main(cls, launcher, description, custom_flags=None):
483 """Get command line execution of this building block. Please check the command line documentation."""
484 def main():
485 # Get the arguments and properties from the class docstring
486 doc_arguments_dict, _ = fu.get_doc_dicts(cls.__doc__)
487 # Create the argument parser
488 parser = argparse.ArgumentParser(description=description,
489 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999))
490 parser.add_argument(
491 "-c", "--config", required=False,
492 help="This file can be a YAML file, JSON file or JSON string",
493 )
494 required_args = parser.add_argument_group("required arguments")
495 optional_args = parser.add_argument_group("optional arguments")
496 # Use the doc_arguments_dict to add arguments to the parser
497 # If we have only one input or output argument, we can use shorthand flags -i/-o
498 input_args = [arg for arg, arg_dict in doc_arguments_dict.items() if arg_dict.get("input_output", "").lower().startswith("input")]
499 output_args = [arg for arg, arg_dict in doc_arguments_dict.items() if arg_dict.get("input_output", "").lower().startswith("output")]
501 for argument, argument_dict in doc_arguments_dict.items():
502 # Determine if we should add shorthand flags
503 shorthand_flags = [f'--{argument}']
505 # Check if custom flags are provided for this argument
506 if custom_flags and argument in custom_flags:
507 shorthand_flags.insert(0, custom_flags[argument])
508 elif len(input_args) == 1 and argument in input_args:
509 shorthand_flags.insert(0, '-i')
510 elif len(output_args) == 1 and argument in output_args:
511 shorthand_flags.insert(0, '-o')
512 help_str = argument_dict.get("description", "") + f". Accepted formats: {', '.join(argument_dict.get('formats', {}).keys())}."
513 if argument_dict["optional"]:
514 optional_args.add_argument(*shorthand_flags, required=False, help=help_str)
515 else:
516 required_args.add_argument(*shorthand_flags, required=True, help=help_str)
517 # Parse the arguments from the command line
518 args = parser.parse_args()
519 args.config = args.config or "{}"
520 # Get the properties from the configuration yaml
521 properties = settings.ConfReader(config=args.config).get_prop_dic()
522 args_dict = vars(args)
523 args_dict.pop('config', None)
524 # Remove keys with None values from args_dict
525 args_dict = {k: v for k, v in args_dict.items() if v is not None}
526 # Return the function without executing it
527 launcher(**args_dict, properties=properties)
528 return main