Coverage for biobb_common/biobb_common/generic/biobb_object.py: 55%

218 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 11:32 +0000

1"""Module containing the BiobbObject generic parent class.""" 

2import difflib 

3import importlib 

4import os 

5import shutil 

6import warnings 

7from logging import Logger 

8from pathlib import Path 

9from pydoc import locate 

10from sys import platform 

11from typing import Any, Optional, Union 

12 

13from biobb_common.command_wrapper import cmd_wrapper 

14from biobb_common.tools import file_utils as fu 

15 

16 

17class BiobbObject: 

18 """ 

19 | biobb_common BiobbObject 

20 | Generic parent class for the rest of the Biobb clases. 

21 | The BiobbOject class contains all the properties and methods that are common to all the biobb blocks. 

22 

23 Args: 

24 properties (dict - Python dictionary object containing the tool parameters, not input/output files): 

25 

26 * **io_dict** (*dict*) - ({}) Input/Output files dictionary. 

27 * **container_path** (*str*) - (None) Path to the binary executable of your container. 

28 * **container_image** (*str*) - (None) Container Image identifier. 

29 * **container_volume_path** (*str*) - ("/data") Path to an internal directory in the container. 

30 * **container_working_dir** (*str*) - (None) Path to the internal CWD in the container. 

31 * **container_user_id** (*str*) - (None) User number id to be mapped inside the container. 

32 * **container_shell_path** (*str*) - ("/bin/bash -c") Path to the binary executable of the container shell. 

33 * **container_generic_command** (*str*) - ("run") Which command typically run or exec will be used to execute your image. 

34 * **stage_io_dict** (*dict*) - ({}) Stage Input/Output files dictionary. 

35 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

36 * **disable_sandbox** (*bool*) - (False) Disable the use of temporal unique directories aka sandbox. Only for local execution. 

37 * **global_properties_list** (*list*) - ([]) list of global properties. 

38 * **chdir_sandbox** (*bool*) - (False) Change directory to the sandbox using just file names in the command line. Only for local execution. 

39 * **binary_path** (*str*) - ('') Path to the binary executable. 

40 * **can_write_console_log** (*bool*) - (True) Can write console log. 

41 * **global_log** (*Logger object*) - (None) Log from the main workflow. 

42 * **out_log** (*Logger object*) - (None) Log from the step. 

43 * **err_log** (*Logger object*) - (None) Error log from the step. 

44 * **out_log_path** (*str*) - (None) Path to the log file. 

45 * **err_log_path** (*str*) - (None) Path to the error log file. 

46 * **disable_logs** (*bool*) - (False) Disable the logs. 

47 * **prefix** (*str*) - (None) Prefix if provided. 

48 * **step** (*str*) - (None) Name of the step. 

49 * **path** (*str*) - ('') Absolute path to the step working dir. 

50 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

51 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

52 * **cmd** (*list*) - ([]) Command line list, NOT read from the dictionary. 

53 * **return_code** (*int*) - (0) Return code of the command execution, NOT read from the dictionary. 

54 * **timeout** (*int*) - (None) Timeout for the execution of the command. 

55 * **tmp_files** (*list*) - ([]) list of temporal files, NOT read from the dictionary. 

56 * **env_vars_dict** (*dict*) - ({}) Environment Variables dictionary. 

57 * **shell_path** (*str*) - ("/bin/bash") Path to the binary executable of the shell. 

58 * **dev** (*str*) - (None) Development options. 

59 * **check_extensions** (*bool*) - (True) Check extensions of the input/output files. 

60 * **check_var_typing** (*bool*) - (True) Check typing of the input/output files. 

61 * **locals_var_dict** (*dict*) - ({}) Local variables dictionary. 

62 * **doc_arguments_dict** (*dict*) - ({}) Documentation arguments dictionary. 

63 * **doc_properties_dict** (*dict*) - ({}) Documentation properties dictionary. 

64 

65 

66 """ 

67 

68 def __init__(self, properties=None, **kwargs) -> None: # type: ignore 

69 properties = properties or {} 

70 

71 # Input/Output files 

72 self.io_dict: dict[str, dict] = {"in": {}, "out": {}} 

73 

74 # container Specific 

75 self.container_path: Optional[str] = properties.get("container_path") 

76 self.container_image: str = properties.get("container_image", '') 

77 self.container_volume_path: str = properties.get("container_volume_path", "/data") 

78 self.container_working_dir: Optional[str] = properties.get("container_working_dir") 

79 self.container_user_id: Optional[str] = properties.get("container_user_id") 

80 self.container_shell_path: str = properties.get("container_shell_path", "/bin/bash -c") 

81 self.container_generic_command: str = properties.get("container_generic_command", "run") 

82 

83 # stage 

84 self.stage_io_dict: dict[str, Any] = {"in": {}, "out": {}} 

85 self.sandbox_path: Union[str, Path] = properties.get("sandbox_path", Path().cwd()) 

86 self.disable_sandbox: bool = properties.get("disable_sandbox", False) 

87 

88 # Properties common in all BB 

89 self.global_properties_list: list[str] = properties.get("global_properties_list", []) 

90 self.chdir_sandbox: bool = properties.get("chdir_sandbox", False) 

91 self.binary_path: str = properties.get("binary_path", '') 

92 self.can_write_console_log: bool = properties.get("can_write_console_log", True) 

93 self.global_log: Optional[Logger] = properties.get("global_log", None) 

94 self.out_log: Optional[Logger] = None 

95 self.err_log: Optional[Logger] = None 

96 self.out_log_path: Optional[Union[Path, str]] = properties.get("out_log_path", None) 

97 self.err_log_path: Optional[Union[Path, str]] = properties.get("err_log_path", None) 

98 self.disable_logs: bool = properties.get("disable_logs", False) 

99 self.prefix: Optional[str] = properties.get("prefix", None) 

100 self.step: Optional[str] = properties.get("step", None) 

101 self.path: str = properties.get("path", "") 

102 self.remove_tmp: bool = properties.get("remove_tmp", True) 

103 self.restart: bool = properties.get("restart", False) 

104 self.cmd: list[str] = [] 

105 self.return_code: int = 0 

106 self.timeout: Optional[int] = properties.get("timeout", None) 

107 self.tmp_files: list[Union[str, Path]] = [] 

108 self.env_vars_dict: dict = properties.get("env_vars_dict", {}) 

109 self.shell_path: Union[str, Path] = properties.get("shell_path", os.getenv("SHELL", "/bin/bash")) 

110 self.dev: Optional[str] = properties.get("dev", None) 

111 self.check_extensions: bool = properties.get("check_extensions", True) 

112 self.check_var_typing: bool = properties.get("check_var_typing", True) 

113 self.locals_var_dict: dict[str, str] = dict() 

114 self.doc_arguments_dict, self.doc_properties_dict = fu.get_doc_dicts(self.__doc__) 

115 

116 try: 

117 self.version = importlib.import_module( 

118 self.__module__.split(".")[0] 

119 ).__version__ 

120 except Exception: 

121 self.version = None 

122 

123 if self.disable_sandbox and self.remove_tmp: 

124 self.remove_tmp = False 

125 fu.log("WARNING: Disabling remove_tmp because disable_sandbox is enabled", self.out_log, self.global_log) 

126 

127 def check_arguments( 

128 self, output_files_created: bool = False, raise_exception: bool = True 

129 ): 

130 for argument, argument_dict in self.doc_arguments_dict.items(): 

131 fu.check_argument( 

132 path=Path(self.locals_var_dict[argument]) 

133 if self.locals_var_dict.get(argument) 

134 else None, 

135 argument=argument, 

136 optional=argument_dict.get("optional", False), 

137 module_name=self.__module__, 

138 input_output=argument_dict.get( 

139 "input_output", "").lower().strip(), 

140 output_files_created=output_files_created, 

141 extension_list=list(argument_dict.get("formats")), 

142 check_extensions=self.check_extensions, 

143 raise_exception=raise_exception, 

144 out_log=self.out_log, 

145 ) 

146 if output_files_created: 

147 fu.log("", self.out_log, self.global_log) 

148 

149 def check_properties( 

150 self, 

151 properties: dict, 

152 reserved_properties: Optional[set[str]] = None, 

153 check_var_typing: bool = False, 

154 ): 

155 if not reserved_properties: 

156 reserved_properties = set() 

157 reserved_properties = {"system", "working_dir_path", "tool"}.union(reserved_properties) 

158 reserved_properties = reserved_properties.union(set(self.global_properties_list)) 

159 error_properties = set([prop for prop in properties.keys() if prop not in self.__dict__.keys()]) 

160 

161 # Check types 

162 if check_var_typing and self.doc_properties_dict: 

163 for prop, value in properties.items(): 

164 if self.doc_properties_dict.get(prop): 

165 property_type = self.doc_properties_dict[prop].get("type") 

166 classinfo: object = locate(property_type).__class__ 

167 if classinfo == type: 

168 classinfo = locate(property_type) 

169 if not isinstance(value, classinfo): # type: ignore 

170 warnings.warn( 

171 f"Warning: {prop} property type not recognized. Got {type(value)} Expected {locate(property_type)}" 

172 ) 

173 

174 error_properties = set( 

175 [prop for prop in properties.keys() if prop not in self.__dict__.keys()] 

176 ) 

177 error_properties -= reserved_properties 

178 for error_property in error_properties: 

179 close_property = difflib.get_close_matches( 

180 error_property, self.__dict__.keys(), n=1, cutoff=0.01 

181 ) 

182 close_property = close_property[0] if close_property else "" # type: ignore 

183 warnings.warn( 

184 "Warning: %s is not a recognized property. The most similar property is: %s" 

185 % (error_property, close_property) 

186 ) 

187 

188 def check_restart(self) -> bool: 

189 if self.version: 

190 fu.log( 

191 f"Module: {self.__module__} Version: {self.version}", 

192 self.out_log, 

193 self.global_log, 

194 ) 

195 

196 if self.restart: 

197 if fu.check_complete_files(self.io_dict["out"].values()): # type: ignore 

198 fu.log( 

199 "Restart is enabled, this step: %s will the skipped" % self.step, 

200 self.out_log, 

201 self.global_log, 

202 ) 

203 return True 

204 return False 

205 

206 def stage_files(self): 

207 if self.disable_sandbox: 

208 self.stage_io_dict = self.io_dict.copy() 

209 self.stage_io_dict["unique_dir"] = os.getcwd() 

210 return 

211 

212 unique_dir = str(Path(fu.create_unique_dir(path=str(self.sandbox_path), prefix="sandbox_", out_log=self.out_log)).resolve()) 

213 self.stage_io_dict = {"in": {}, "out": {}, "unique_dir": unique_dir} 

214 

215 # Add unique_dir to tmp_files 

216 self.tmp_files.append(unique_dir) 

217 

218 # IN files COPY and assign INTERNAL PATH 

219 for file_ref, file_path in self.io_dict.get("in", {}).items(): 

220 if file_path: 

221 if Path(file_path).exists(): 

222 shutil.copy2(file_path, unique_dir) 

223 fu.log(f"Copy: {file_path} to {unique_dir}", self.out_log) 

224 # Container 

225 if self.container_path: 

226 self.stage_io_dict["in"][file_ref] = str( 

227 Path(self.container_volume_path).joinpath( 

228 Path(file_path).name 

229 ) 

230 ) 

231 # Local 

232 else: 

233 self.stage_io_dict["in"][file_ref] = str( 

234 Path(unique_dir).joinpath(Path(file_path).name) 

235 ) 

236 if self.chdir_sandbox: 

237 self.stage_io_dict["in"][file_ref] = str( 

238 Path(file_path).name 

239 ) 

240 else: 

241 # Default files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro) 

242 self.stage_io_dict["in"][file_ref] = file_path 

243 

244 # OUT files assign INTERNAL PATH 

245 for file_ref, file_path in self.io_dict.get("out", {}).items(): 

246 if file_path: 

247 # Container 

248 if self.container_path: 

249 self.stage_io_dict["out"][file_ref] = str( 

250 Path(self.container_volume_path).joinpath( 

251 Path(file_path).name) 

252 ) 

253 # Local 

254 else: 

255 self.stage_io_dict["out"][file_ref] = str( 

256 Path(unique_dir).joinpath(Path(file_path).name) 

257 ) 

258 if self.chdir_sandbox: 

259 self.stage_io_dict["out"][file_ref] = str( 

260 Path(file_path).name) 

261 

262 def create_cmd_line(self) -> None: 

263 # Not documented and not listed option, only for devs 

264 if self.dev: 

265 fu.log( 

266 f"Adding development options: {self.dev}", self.out_log, self.global_log 

267 ) 

268 self.cmd += self.dev.split() 

269 

270 # Containers 

271 host_volume: str = str(self.stage_io_dict.get("unique_dir", '')) 

272 self.container_path = self.container_path or "" 

273 # Singularity 

274 if self.container_path.endswith("singularity"): 

275 fu.log( 

276 "Using Singularity image %s" % self.container_image, 

277 self.out_log, 

278 self.global_log, 

279 ) 

280 if not Path(self.container_image).exists(): 

281 fu.log( 

282 f"{self.container_image} does not exist trying to pull it", 

283 self.out_log, 

284 self.global_log, 

285 ) 

286 container_image_name = str( 

287 Path(self.container_image).with_suffix(".sif").name 

288 ) 

289 singularity_pull_cmd = [ 

290 self.container_path, 

291 "pull", 

292 "--name", 

293 container_image_name, 

294 self.container_image, 

295 ] 

296 try: 

297 from biobb_common.command_wrapper import cmd_wrapper 

298 

299 cmd_wrapper.CmdWrapper( 

300 singularity_pull_cmd, self.shell_path, self.out_log 

301 ).launch() 

302 if Path(container_image_name).exists(): 

303 self.container_image = container_image_name 

304 else: 

305 raise FileNotFoundError 

306 except FileNotFoundError: 

307 fu.log( 

308 f"{' '.join(singularity_pull_cmd)} not found", 

309 self.out_log, 

310 self.global_log, 

311 ) 

312 raise FileNotFoundError 

313 singularity_cmd = [ 

314 self.container_path, 

315 self.container_generic_command, 

316 "-e", 

317 ] 

318 

319 if self.env_vars_dict: 

320 singularity_cmd.append("--env") 

321 singularity_cmd.append( 

322 ",".join( 

323 f"{env_var_name}='{env_var_value}'" 

324 for env_var_name, env_var_value in self.env_vars_dict.items() 

325 ) 

326 ) 

327 

328 singularity_cmd.extend( 

329 [ 

330 "--bind", 

331 host_volume + ":" + self.container_volume_path, 

332 self.container_image, 

333 ] 

334 ) 

335 

336 # If we are working on a mac remove -e option because is still no available 

337 if platform == "darwin": 

338 if "-e" in singularity_cmd: 

339 singularity_cmd.remove("-e") 

340 

341 if not self.cmd and not self.container_shell_path: 

342 fu.log( 

343 "WARNING: The command-line is empty your container should know what to do automatically.", 

344 self.out_log, 

345 self.global_log, 

346 ) 

347 else: 

348 cmd = ['"' + " ".join(self.cmd) + '"'] 

349 singularity_cmd.append(self.container_shell_path) 

350 singularity_cmd.extend(cmd) 

351 self.cmd = singularity_cmd 

352 # Docker 

353 elif self.container_path.endswith("docker"): 

354 fu.log( 

355 "Using Docker image %s" % self.container_image, 

356 self.out_log, 

357 self.global_log, 

358 ) 

359 docker_cmd = [self.container_path, self.container_generic_command] 

360 if self.env_vars_dict: 

361 for env_var_name, env_var_value in self.env_vars_dict.items(): 

362 docker_cmd.append("-e") 

363 docker_cmd.append(f"{env_var_name}='{env_var_value}'") 

364 if self.container_working_dir: 

365 docker_cmd.append("-w") 

366 docker_cmd.append(self.container_working_dir) 

367 if self.container_volume_path: 

368 docker_cmd.append("-v") 

369 docker_cmd.append(host_volume + ":" + self.container_volume_path) 

370 if self.container_user_id: 

371 docker_cmd.append("--user") 

372 docker_cmd.append(self.container_user_id) 

373 

374 docker_cmd.append(self.container_image) 

375 

376 if not self.cmd and not self.container_shell_path: 

377 fu.log( 

378 "WARNING: The command-line is empty your container should know what to do automatically.", 

379 self.out_log, 

380 self.global_log, 

381 ) 

382 else: 

383 cmd = ['"' + " ".join(self.cmd) + '"'] 

384 docker_cmd.append(self.container_shell_path) 

385 docker_cmd.extend(cmd) 

386 self.cmd = docker_cmd 

387 # Pcocc 

388 elif self.container_path.endswith("pcocc"): 

389 # pcocc run -I racov56:pmx cli.py mutate -h 

390 fu.log( 

391 "Using pcocc image %s" % self.container_image, 

392 self.out_log, 

393 self.global_log, 

394 ) 

395 pcocc_cmd = [ 

396 self.container_path, 

397 self.container_generic_command, 

398 "-I", 

399 self.container_image, 

400 ] 

401 if self.container_working_dir: 

402 pcocc_cmd.append("--cwd") 

403 pcocc_cmd.append(self.container_working_dir) 

404 if self.container_volume_path: 

405 pcocc_cmd.append("--mount") 

406 pcocc_cmd.append(host_volume + ":" + self.container_volume_path) 

407 if self.container_user_id: 

408 pcocc_cmd.append("--user") 

409 pcocc_cmd.append(self.container_user_id) 

410 

411 if not self.cmd and not self.container_shell_path: 

412 fu.log( 

413 "WARNING: The command-line is empty your container should know what to do automatically.", 

414 self.out_log, 

415 self.global_log, 

416 ) 

417 else: 

418 cmd = ['\\"' + " ".join(self.cmd) + '\\"'] 

419 pcocc_cmd.append(self.container_shell_path) 

420 pcocc_cmd.extend(cmd) 

421 self.cmd = pcocc_cmd 

422 # Local execution 

423 else: 

424 pass 

425 # fu.log('Not using any container', self.out_log, self.global_log) 

426 

427 def execute_command(self): 

428 

429 cwd = os.getcwd() 

430 if self.chdir_sandbox: 

431 os.chdir(self.stage_io_dict["unique_dir"]) 

432 

433 self.return_code = cmd_wrapper.CmdWrapper( 

434 cmd=self.cmd, 

435 shell_path=self.shell_path, 

436 out_log=self.out_log, 

437 err_log=self.err_log, 

438 global_log=self.global_log, 

439 env=self.env_vars_dict, 

440 timeout=self.timeout 

441 ).launch() 

442 

443 if self.chdir_sandbox: 

444 os.chdir(cwd) 

445 

446 def copy_to_host(self): 

447 for file_ref, file_path in self.stage_io_dict["out"].items(): 

448 if file_path: 

449 sandbox_file_path = str( 

450 Path(self.stage_io_dict["unique_dir"]).joinpath( 

451 Path(file_path).name 

452 ) 

453 ) 

454 if Path(sandbox_file_path).exists(): 

455 # Dest file exists 

456 if Path(self.io_dict["out"][file_ref]).exists(): 

457 # Dest file exists and is NOT the same as the source file 

458 if not Path(sandbox_file_path).samefile( 

459 Path(self.io_dict["out"][file_ref]) 

460 ): 

461 shutil.copy2( 

462 sandbox_file_path, self.io_dict["out"][file_ref] 

463 ) 

464 # Dest file does not exist 

465 else: 

466 shutil.copy2(sandbox_file_path, 

467 self.io_dict["out"][file_ref]) 

468 

469 def run_biobb(self): 

470 self.create_cmd_line() 

471 self.execute_command() 

472 

473 def remove_tmp_files(self): 

474 if self.remove_tmp: 

475 fu.rm_file_list(self.tmp_files, self.out_log)