Coverage for biobb_common/biobb_common/tools/file_utils.py: 45%

383 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 11:32 +0000

1"""Tools to work with files 

2""" 

3import difflib 

4import functools 

5import logging 

6import os 

7import errno 

8import pathlib 

9import re 

10import shutil 

11import uuid 

12import warnings 

13import zipfile 

14from sys import platform 

15from pathlib import Path 

16import typing 

17from typing import Optional, Union 

18import sys 

19 

20 

21def create_unique_file_path(parent_dir: Optional[Union[str, Path]] = None, extension: Optional[Union[str, Path]] = None) -> str: 

22 if not parent_dir: 

23 parent_dir = Path.cwd() 

24 if not extension: 

25 extension = "" 

26 while True: 

27 name = f"{uuid.uuid4()}{extension}" 

28 file_path = Path.joinpath(Path(parent_dir).resolve(), name) 

29 if not file_path.exists(): 

30 return str(file_path) 

31 

32 

33def create_dir(dir_path: str) -> str: 

34 """Returns the directory **dir_path** and create it if path does not exist. 

35 

36 Args: 

37 dir_path (str): Path to the directory that will be created. 

38 

39 Returns: 

40 str: Directory dir path. 

41 """ 

42 if not Path(dir_path).exists(): 

43 Path(dir_path).mkdir(exist_ok=True, parents=True) 

44 return str(Path(dir_path)) 

45 

46 

47def create_stdin_file(intput_string: str) -> str: 

48 file_path = create_unique_file_path(extension=".stdin") 

49 with open(file_path, "w") as file_handler: 

50 file_handler.write(intput_string) 

51 return file_path 

52 

53 

54def create_unique_dir( 

55 path: str = "", 

56 prefix: str = "", 

57 number_attempts: int = 10, 

58 out_log: Optional[logging.Logger] = None, 

59) -> str: 

60 """Create a directory with a prefix + computed unique name. If the 

61 computed name collides with an existing file name it attemps 

62 **number_attempts** times to create another unique id and create 

63 the directory with the new name. 

64 

65 Args: 

66 path (str): ('') Parent path of the new directory. 

67 prefix (str): ('') String to be added before the computed unique dir name. 

68 number_attempts (int): (10) number of times creating the directory if there's a name conflict. 

69 out_log (logger): (None) Python logger object. 

70 

71 Returns: 

72 str: Directory dir path. 

73 """ 

74 new_dir = prefix + str(uuid.uuid4()) 

75 if path: 

76 new_dir = str(Path(path).joinpath(new_dir)) 

77 for i in range(number_attempts): 

78 try: 

79 oldumask = os.umask(0) 

80 Path(new_dir).mkdir(mode=0o777, parents=True, exist_ok=False) 

81 if out_log: 

82 out_log.info("%s directory successfully created" % new_dir) 

83 os.umask(oldumask) 

84 return new_dir 

85 except OSError: 

86 if out_log: 

87 out_log.info(new_dir + " Already exists") 

88 out_log.info("Retrying %i times more" % (number_attempts - i)) 

89 new_dir = prefix + str(uuid.uuid4().hex) 

90 if path: 

91 new_dir = str(Path(path).joinpath(new_dir)) 

92 if out_log: 

93 out_log.info("Trying with: " + new_dir) 

94 raise FileExistsError 

95 

96 

97def get_working_dir_path(working_dir_path: Optional[Union[str, Path]] = None, restart: bool = False) -> str: 

98 """Return the directory **working_dir_path** and create it if working_dir_path 

99 does not exist. If **working_dir_path** exists a consecutive numerical suffix 

100 is added to the end of the **working_dir_path** and is returned. 

101 

102 Args: 

103 working_dir_path (str): Path to the workflow results. 

104 restart (bool): If step result exists do not execute the step again. 

105 

106 Returns: 

107 str: Path to the workflow results directory. 

108 """ 

109 if not working_dir_path: 

110 return str(Path.cwd().resolve()) 

111 

112 working_dir_path = str(Path(working_dir_path).resolve()) 

113 

114 if (not Path(working_dir_path).exists()) or restart: 

115 return str(Path(working_dir_path)) 

116 

117 cont = 1 

118 while Path(str(working_dir_path)).exists(): 

119 working_dir_path = ( 

120 re.split(r"_[0-9]+$", str(working_dir_path))[0] + "_" + str(cont) 

121 ) 

122 cont += 1 

123 return str(working_dir_path) 

124 

125 

126def zip_list( 

127 zip_file: Union[str, Path], file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None 

128): 

129 """Compress all files listed in **file_list** into **zip_file** zip file. 

130 

131 Args: 

132 zip_file (str): Output compressed zip file. 

133 file_list (:obj:`list` of :obj:`str`): Input list of files to be compressed. 

134 out_log (:obj:`logging.Logger`): Input log object. 

135 """ 

136 file_list = list(file_list) 

137 file_list.sort() 

138 Path(zip_file).parent.mkdir(parents=True, exist_ok=True) 

139 with zipfile.ZipFile(zip_file, "w") as zip_f: 

140 inserted = [] 

141 for index, f in enumerate(file_list): 

142 base_name = Path(f).name 

143 if base_name in inserted: 

144 base_name = "file_" + str(index) + "_" + base_name 

145 inserted.append(base_name) 

146 zip_f.write(f, arcname=base_name) 

147 if out_log: 

148 out_log.info("Adding:") 

149 out_log.info(str(file_list)) 

150 out_log.info("to: " + str(Path(zip_file).resolve())) 

151 

152 

153def unzip_list( 

154 zip_file: Union[str, Path], dest_dir: Optional[Union[str, Path]] = None, out_log: Optional[logging.Logger] = None 

155) -> list[str]: 

156 """Extract all files in the zipball file and return a list containing the 

157 absolute path of the extracted files. 

158 

159 Args: 

160 zip_file (str): Input compressed zip file. 

161 dest_dir (str): Path to directory where the files will be extracted. 

162 out_log (:obj:`logging.Logger`): Input log object. 

163 

164 Returns: 

165 :obj:`list` of :obj:`str`: list of paths of the extracted files. 

166 """ 

167 with zipfile.ZipFile(zip_file, "r") as zip_f: 

168 zip_f.extractall(path=dest_dir) 

169 file_list = [str(Path(str(dest_dir)).joinpath(f)) for f in zip_f.namelist()] 

170 

171 if out_log: 

172 out_log.info("Extracting: " + str(Path(zip_file).resolve())) 

173 out_log.info("to:") 

174 out_log.info(str(file_list)) 

175 

176 return file_list 

177 

178 

179def search_topology_files( 

180 top_file: Union[str, Path], out_log: Optional[logging.Logger] = None 

181) -> list[str]: 

182 """Search the top and itp files to create a list of the topology files 

183 

184 Args: 

185 top_file (str): Topology GROMACS top file. 

186 out_log (:obj:`logging.Logger`): Input log object. 

187 

188 Returns: 

189 :obj:`list` of :obj:`str`: list of paths of the extracted files. 

190 """ 

191 top_dir_name = str(Path(top_file).parent) 

192 file_list = [] 

193 pattern = re.compile(r"#include\s+\"(.+)\"") 

194 if Path(top_file).exists(): 

195 with open(top_file) as tf: 

196 for line in tf: 

197 include_file = pattern.match(line.strip()) 

198 if include_file: 

199 found_file = str(Path(top_dir_name).joinpath(include_file.group(1))) 

200 file_list += search_topology_files(found_file, out_log) 

201 else: 

202 if out_log: 

203 out_log.info("Ignored file %s" % top_file) 

204 return file_list 

205 return file_list + [str(top_file)] 

206 

207 

208def zip_top( 

209 zip_file: Union[str, Path], 

210 top_file: Union[str, Path], 

211 out_log: Optional[logging.Logger] = None, 

212 remove_original_files: bool = True, 

213) -> list[str]: 

214 """Compress all *.itp and *.top files in the cwd into **zip_file** zip file. 

215 

216 Args: 

217 zip_file (str): Output compressed zip file. 

218 top_file (str): Topology TOP GROMACS file. 

219 out_log (:obj:`logging.Logger`): Input log object. 

220 

221 Returns: 

222 :obj:`list` of :obj:`str`: list of compressed paths. 

223 """ 

224 

225 file_list = search_topology_files(top_file, out_log) 

226 zip_list(zip_file, file_list, out_log) 

227 if remove_original_files: 

228 rm_file_list(file_list, out_log) 

229 return file_list 

230 

231 

232def unzip_top( 

233 zip_file: Union[str, Path], 

234 out_log: Optional[logging.Logger] = None, 

235 unique_dir: Optional[Union[pathlib.Path, str]] = None, 

236) -> str: 

237 """Extract all files in the zip_file and copy the file extracted ".top" file to top_file. 

238 

239 Args: 

240 zip_file (str): Input topology zipball file path. 

241 out_log (:obj:`logging.Logger`): Input log object. 

242 unique_dir (str): Directory where the topology will be extracted. 

243 

244 Returns: 

245 str: Path to the extracted ".top" file. 

246 

247 """ 

248 unique_dir = unique_dir or create_unique_dir() 

249 top_list = unzip_list(zip_file, unique_dir, out_log) 

250 top_file = next(name for name in top_list if name.endswith(".top")) 

251 if out_log: 

252 out_log.info("Unzipping: ") 

253 out_log.info(zip_file) 

254 out_log.info("To: ") 

255 for file_name in top_list: 

256 out_log.info(file_name) 

257 return top_file 

258 

259 

260def get_logs_prefix(): 

261 return 4 * " " 

262 

263 

264def create_incremental_name(path: Union[Path, str]) -> str: 

265 """Increment the name of the file by adding a number at the end. 

266 

267 Args: 

268 path (str): path of the file. 

269 

270 Returns: 

271 str: Incremented name of the file. 

272 """ 

273 if (path_obj := Path(path)).exists(): 

274 cont = 1 

275 while path_obj.exists(): 

276 new_name = f'{path_obj.stem.rstrip("0123456789_")}_{cont}{path_obj.suffix}' 

277 path_obj = path_obj.with_name(new_name) 

278 cont += 1 

279 return str(path_obj) 

280 

281 

282def get_logs( 

283 path: Optional[Union[str, Path]] = None, 

284 prefix: Optional[str] = None, 

285 step: Optional[str] = None, 

286 can_write_console: bool = True, 

287 out_log_path: Optional[Union[str, Path]] = None, 

288 err_log_path: Optional[Union[str, Path]] = None, 

289 level: str = "INFO", 

290 light_format: bool = False, 

291) -> tuple[logging.Logger, logging.Logger]: 

292 """Get the error and and out Python Logger objects. 

293 

294 Args: 

295 path (str): (current working directory) Path to the log file directory. 

296 prefix (str): Prefix added to the name of the log file. 

297 step (str): String added between the **prefix** arg and the name of the log file. 

298 can_write_console (bool): (False) If True, show log in the execution terminal. 

299 out_log_path (str): (None) Path to the out log file. 

300 err_log_path (str): (None) Path to the err log file. 

301 level (str): ('INFO') Set Logging level. ['CRITICAL','ERROR','WARNING','INFO','DEBUG','NOTSET'] 

302 light_format (bool): (False) Minimalist log format. 

303 

304 Returns: 

305 :obj:`tuple` of :obj:`logging.Logger` and :obj:`logging.Logger`: Out and err Logger objects. 

306 """ 

307 prefix = prefix if prefix else "" 

308 step = step if step else "" 

309 path = path if path else str(Path.cwd()) 

310 

311 out_log_path = out_log_path or "log.out" 

312 err_log_path = err_log_path or "log.err" 

313 

314 # If paths are absolute create and return them 

315 if not Path(out_log_path).is_absolute(): 

316 out_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(out_log_path))) 

317 if not Path(err_log_path).is_absolute(): 

318 err_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(err_log_path))) 

319 

320 # Create dir if it not exists 

321 create_dir(str(Path(out_log_path).resolve().parent)) 

322 

323 # Create logging format 

324 logFormatter = logging.Formatter( 

325 "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s" 

326 ) 

327 if light_format: 

328 logFormatter = logging.Formatter("%(asctime)s %(message)s", "%H:%M:%S") 

329 # Create logging objects 

330 out_Logger = logging.getLogger(str(out_log_path)) 

331 err_Logger = logging.getLogger(str(err_log_path)) 

332 

333 # Create FileHandler 

334 out_fileHandler = logging.FileHandler( 

335 out_log_path, mode="a", encoding=None, delay=False 

336 ) 

337 err_fileHandler = logging.FileHandler( 

338 err_log_path, mode="a", encoding=None, delay=False 

339 ) 

340 

341 # Asign format to FileHandler 

342 out_fileHandler.setFormatter(logFormatter) 

343 err_fileHandler.setFormatter(logFormatter) 

344 

345 # Assign FileHandler to logging object 

346 if not len(out_Logger.handlers): 

347 out_Logger.addHandler(out_fileHandler) 

348 err_Logger.addHandler(err_fileHandler) 

349 

350 # Create consoleHandler 

351 consoleHandler = logging.StreamHandler(stream=sys.stdout) 

352 # Assign format to consoleHandler 

353 consoleHandler.setFormatter(logFormatter) 

354 

355 # Assign consoleHandler to logging objects as aditional output 

356 if can_write_console and len(out_Logger.handlers) < 2: 

357 out_Logger.addHandler(consoleHandler) 

358 err_Logger.addHandler(consoleHandler) 

359 

360 # Set logging level level 

361 out_Logger.setLevel(level) 

362 err_Logger.setLevel(level) 

363 return out_Logger, err_Logger 

364 

365 

366def launchlogger(func): 

367 @functools.wraps(func) 

368 def wrapper_log(*args, **kwargs): 

369 create_dir(create_name(path=args[0].path)) 

370 if args[0].disable_logs: 

371 return func(*args, **kwargs) 

372 

373 args[0].out_log, args[0].err_log = get_logs( 

374 path=args[0].path, 

375 prefix=args[0].prefix, 

376 step=args[0].step, 

377 can_write_console=args[0].can_write_console_log, 

378 out_log_path=args[0].out_log_path, 

379 err_log_path=args[0].err_log_path 

380 ) 

381 value = func(*args, **kwargs) 

382 handlers = args[0].out_log.handlers[ 

383 : 

384 ] # Create a copy [:] of the handler list to be able to modify it while we are iterating 

385 for handler in handlers: 

386 handler.close() 

387 args[0].out_log.removeHandler(handler) 

388 handlers = args[0].err_log.handlers[ 

389 : 

390 ] # Create a copy [:] of the handler list to be able to modify it while we are iterating 

391 for handler in handlers: 

392 handler.close() 

393 args[0].err_log.removeHandler(handler) 

394 return value 

395 

396 return wrapper_log 

397 

398 

399def log(string: str, local_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None): 

400 """Checks if log exists 

401 

402 Args: 

403 string (str): Message to log. 

404 local_log (:obj:`logging.Logger`): local log object. 

405 global_log (:obj:`logging.Logger`): global log object. 

406 

407 """ 

408 if local_log: 

409 local_log.info(string) 

410 if global_log: 

411 global_log.info(get_logs_prefix() + string) 

412 

413 

414def human_readable_time(time_ps: int) -> str: 

415 """Transform **time_ps** to a human readable string. 

416 

417 Args: 

418 time_ps (int): Time in pico seconds. 

419 

420 Returns: 

421 str: Human readable time. 

422 """ 

423 time_units = [ 

424 "femto seconds", 

425 "pico seconds", 

426 "nano seconds", 

427 "micro seconds", 

428 "mili seconds", 

429 ] 

430 t = time_ps * 1000 

431 for tu in time_units: 

432 if t < 1000: 

433 return str(t) + " " + tu 

434 

435 t = int(t/1000) 

436 return str(time_ps) 

437 

438 

439def check_properties(obj: object, properties: dict, reserved_properties: Optional[list[str]] = None): 

440 if not reserved_properties: 

441 reserved_properties = [] 

442 error_properties = set( 

443 [prop for prop in properties.keys() if prop not in obj.__dict__.keys()] 

444 ) 

445 error_properties -= set(["system", "working_dir_path"] + list(reserved_properties)) 

446 for error_property in error_properties: 

447 close_property_list = difflib.get_close_matches( 

448 error_property, obj.__dict__.keys(), n=1, cutoff=0.01 

449 ) 

450 close_property = close_property_list[0] if close_property_list else "" 

451 warnings.warn( 

452 "Warning: %s is not a recognized property. The most similar property is: %s" 

453 % (error_property, close_property) 

454 ) 

455 

456 

457def create_name( 

458 path: Optional[Union[str, Path]] = None, prefix: Optional[str] = None, 

459 step: Optional[str] = None, name: Optional[str] = None 

460) -> str: 

461 """Return file name. 

462 

463 Args: 

464 path (str): Path to the file directory. 

465 prefix (str): Prefix added to the name of the file. 

466 step (str): String added between the **prefix** arg and the **name** arg of the file. 

467 name (str): Name of the file. 

468 

469 Returns: 

470 str: Composed file name. 

471 """ 

472 name = "" if name is None else name.strip() 

473 if step: 

474 if name: 

475 name = step + "_" + name 

476 else: 

477 name = step 

478 if prefix: 

479 prefix = prefix.replace("/", "_") 

480 if name: 

481 name = prefix + "_" + name 

482 else: 

483 name = prefix 

484 if path: 

485 if name: 

486 name = str(Path(path).joinpath(name)) 

487 else: 

488 name = str(path) 

489 return name 

490 

491 

492def write_failed_output(file_name: str): 

493 with open(file_name, "w") as f: 

494 f.write("Error\n") 

495 

496 

497def rm(file_name: Union[str, Path]) -> Optional[Union[str, Path]]: 

498 try: 

499 file_path = pathlib.Path(file_name) 

500 if file_path.exists(): 

501 if file_path.is_dir(): 

502 shutil.rmtree(file_name) 

503 return file_name 

504 if file_path.is_file(): 

505 Path(file_name).unlink() 

506 return file_name 

507 except Exception: 

508 pass 

509 return None 

510 

511 

512def rm_file_list( 

513 file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None 

514) -> list[str]: 

515 removed_files = [str(f) for f in file_list if rm(f)] 

516 if out_log: 

517 log("Removed: %s" % str(removed_files), out_log) 

518 return removed_files 

519 

520 

521def check_complete_files(output_file_list: list[Union[str, Path]]) -> bool: 

522 for output_file in filter(None, output_file_list): 

523 output_file = Path(str(output_file)) 

524 if not (output_file.is_file() and output_file.stat().st_size > 0): 

525 return False 

526 return True 

527 

528 

529def copy_to_container(container_path: Optional[Union[str, Path]], container_volume_path: str, 

530 io_dict: dict, out_log: Optional[logging.Logger] = None) -> dict: 

531 if not container_path: 

532 return io_dict 

533 

534 unique_dir = str(Path(create_unique_dir()).resolve()) 

535 container_io_dict: dict = {"in": {}, "out": {}, "unique_dir": unique_dir} 

536 

537 # IN files COPY and assign INTERNAL PATH 

538 for file_ref, file_path in io_dict["in"].items(): 

539 if file_path: 

540 if Path(file_path).exists(): 

541 shutil.copy2(file_path, unique_dir) 

542 log(f"Copy: {file_path} to {unique_dir}") 

543 container_io_dict["in"][file_ref] = str( 

544 Path(container_volume_path).joinpath(Path(file_path).name) 

545 ) 

546 else: 

547 # Default files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro) 

548 container_io_dict["in"][file_ref] = file_path 

549 

550 # OUT files assign INTERNAL PATH 

551 for file_ref, file_path in io_dict["out"].items(): 

552 if file_path: 

553 container_io_dict["out"][file_ref] = str( 

554 Path(container_volume_path).joinpath(Path(file_path).name) 

555 ) 

556 

557 return container_io_dict 

558 

559 

560def copy_to_host(container_path: str, container_io_dict: dict, io_dict: dict): 

561 if not container_path: 

562 return 

563 

564 # OUT files COPY 

565 for file_ref, file_path in container_io_dict["out"].items(): 

566 if file_path: 

567 container_file_path = str( 

568 Path(container_io_dict["unique_dir"]).joinpath(Path(file_path).name) 

569 ) 

570 if Path(container_file_path).exists(): 

571 shutil.copy2(container_file_path, io_dict["out"][file_ref]) 

572 

573 

574def create_cmd_line( 

575 cmd: list[str], 

576 container_path: Optional[Union[str, Path]] = "", 

577 host_volume: Optional[Union[str, Path]] = None, 

578 container_volume: Optional[Union[str, Path]] = None, 

579 container_working_dir: Optional[Union[str, Path]] = None, 

580 container_user_uid: Optional[str] = None, 

581 container_shell_path: Optional[Union[str, Path]] = None, 

582 container_image: Optional[Union[str, Path]] = None, 

583 out_log: Optional[logging.Logger] = None, 

584 global_log: Optional[logging.Logger] = None 

585) -> list[str]: 

586 container_path = container_path or "" 

587 if str(container_path).endswith("singularity"): 

588 log("Using Singularity image %s" % container_image, out_log, global_log) 

589 if not Path(str(container_image)).exists(): 

590 log( 

591 f"{container_image} does not exist trying to pull it", 

592 out_log, 

593 global_log, 

594 ) 

595 container_image_name = str(Path(str(container_image)).with_suffix(".sif").name) 

596 singularity_pull_cmd = [ 

597 str(container_path), 

598 "pull", 

599 "--name", 

600 str(container_image_name), 

601 str(container_image), 

602 ] 

603 try: 

604 from biobb_common.command_wrapper import cmd_wrapper 

605 

606 cmd_wrapper.CmdWrapper(cmd=singularity_pull_cmd, out_log=out_log).launch() 

607 if Path(container_image_name).exists(): 

608 container_image = container_image_name 

609 else: 

610 raise FileNotFoundError 

611 except FileNotFoundError: 

612 log(f"{' '.join(singularity_pull_cmd)} not found", out_log, global_log) 

613 raise FileNotFoundError 

614 singularity_cmd: list[str] = [ 

615 str(container_path), 

616 "exec", 

617 "-e", 

618 "--bind", 

619 str(host_volume) + ":" + str(container_volume), 

620 str(container_image), 

621 ] 

622 # If we are working on a mac remove -e option because is still no available 

623 if platform == "darwin": 

624 if "-e" in singularity_cmd: 

625 singularity_cmd.remove("-e") 

626 

627 cmd = ['"' + " ".join(cmd) + '"'] 

628 singularity_cmd.extend([str(container_shell_path), "-c"]) 

629 return singularity_cmd + cmd 

630 

631 elif str(container_path).endswith("docker"): 

632 log("Using Docker image %s" % container_image, out_log, global_log) 

633 docker_cmd = [str(container_path), "run"] 

634 if container_working_dir: 

635 docker_cmd.append("-w") 

636 docker_cmd.append(str(container_working_dir)) 

637 if container_volume: 

638 docker_cmd.append("-v") 

639 docker_cmd.append(str(host_volume) + ":" + str(container_volume)) 

640 if container_user_uid: 

641 docker_cmd.append("--user") 

642 docker_cmd.append(container_user_uid) 

643 

644 docker_cmd.append(str(container_image)) 

645 

646 cmd = ['"' + " ".join(cmd) + '"'] 

647 docker_cmd.extend([str(container_shell_path), "-c"]) 

648 return docker_cmd + cmd 

649 

650 elif str(container_path).endswith("pcocc"): 

651 # pcocc run -I racov56:pmx cli.py mutate -h 

652 log("Using pcocc image %s" % container_image, out_log, global_log) 

653 pcocc_cmd = [str(container_path), "run", "-I", str(container_image)] 

654 if container_working_dir: 

655 pcocc_cmd.append("--cwd") 

656 pcocc_cmd.append(str(container_working_dir)) 

657 if container_volume: 

658 pcocc_cmd.append("--mount") 

659 pcocc_cmd.append(str(host_volume) + ":" + str(container_volume)) 

660 if container_user_uid: 

661 pcocc_cmd.append("--user") 

662 pcocc_cmd.append(container_user_uid) 

663 

664 cmd = ['\\"' + " ".join(cmd) + '\\"'] 

665 pcocc_cmd.extend([str(container_shell_path), "-c"]) 

666 return pcocc_cmd + cmd 

667 

668 else: 

669 # log('Not using any container', out_log, global_log) 

670 return cmd 

671 

672 

673def get_doc_dicts(doc: Optional[str]): 

674 regex_argument = re.compile( 

675 r"(?P<argument>\w*)\ *(?:\()(?P<type>\w*)(?:\)):?\ *(?P<optional>\(\w*\):)?\ *(?P<description>.*?)(?:\.)\ *(?:File type:\ *)(?P<input_output>\w+)\.\ *(\`(?:.+)\<(?P<sample_file>.*?)\>\`\_\.)?\ *(?:Accepted formats:\ *)(?P<formats>.+)(?:\.)?" 

676 ) 

677 regex_argument_formats = re.compile( 

678 r"(?P<extension>\w*)\ *(\(\ *)\ *edam\ *:\ *(?P<edam>\w*)" 

679 ) 

680 regex_property = re.compile( 

681 r"(?:\*\ *\*\*)(?P<property>.*?)(?:\*\*)\ *(?:\(\*)(?P<type>\w*)(?:\*\))\ *\-\ ?(?:\()(?P<default_value>.*?)(?:\))\ *(?:(?:\[)(?P<wf_property>WF property)(?:\]))?\ *(?:(?:\[)(?P<range_start>[\-]?\d+(?:\.\d+)?)\~(?P<range_stop>[\-]?\d+(?:\.\d+)?)(?:\|)?(?P<range_step>\d+(?:\.\d+)?)?(?:\]))?\ *(?:(?:\[)(.*?)(?:\]))?\ *(?P<description>.*)" 

682 ) 

683 regex_property_value = re.compile( 

684 r"(?P<value>\w*)\ *(?:(?:\()(?P<description>.*?)?(?:\)))?" 

685 ) 

686 

687 doc_lines = list( 

688 map(str.strip, filter(lambda line: line.strip(), str(doc).splitlines())) 

689 ) 

690 args_index = doc_lines.index( 

691 next(filter(lambda line: line.lower().startswith("args"), doc_lines)) 

692 ) 

693 properties_index = doc_lines.index( 

694 next(filter(lambda line: line.lower().startswith("properties"), doc_lines)) 

695 ) 

696 examples_index = doc_lines.index( 

697 next(filter(lambda line: line.lower().startswith("examples"), doc_lines)) 

698 ) 

699 arguments_lines_list = doc_lines[args_index + 1: properties_index] 

700 properties_lines_list = doc_lines[properties_index + 1: examples_index] 

701 

702 doc_arguments_dict = {} 

703 for argument_line in arguments_lines_list: 

704 match_argument = regex_argument.match(argument_line) 

705 argument_dict = match_argument.groupdict() if match_argument is not None else {} 

706 argument_dict["formats"] = { 

707 match.group("extension"): match.group("edam") 

708 for match in regex_argument_formats.finditer(argument_dict["formats"]) 

709 } 

710 doc_arguments_dict[argument_dict.pop("argument")] = argument_dict 

711 

712 doc_properties_dict = {} 

713 for property_line in properties_lines_list: 

714 match_property = regex_property.match(property_line) 

715 property_dict = match_property.groupdict() if match_property is not None else {} 

716 property_dict["values"] = None 

717 if "Values:" in property_dict["description"]: 

718 property_dict["description"], property_dict["values"] = property_dict[ 

719 "description" 

720 ].split("Values:") 

721 property_dict["values"] = { 

722 match.group("value"): match.group("description") 

723 for match in regex_property_value.finditer(property_dict["values"]) 

724 if match.group("value") 

725 } 

726 doc_properties_dict[property_dict.pop("property")] = property_dict 

727 

728 return doc_arguments_dict, doc_properties_dict 

729 

730 

731def check_argument( 

732 path: Optional[pathlib.Path], 

733 argument: str, 

734 optional: bool, 

735 module_name: str, 

736 input_output: Optional[str] = None, 

737 output_files_created: bool = False, 

738 extension_list: Optional[list[str]] = None, 

739 raise_exception: bool = True, 

740 check_extensions: bool = True, 

741 out_log: Optional[logging.Logger] = None, 

742) -> None: 

743 if optional and not path: 

744 return None 

745 

746 if input_output in ["in", "input"]: 

747 input_file = True 

748 elif input_output in ["out", "output"]: 

749 input_file = False 

750 else: 

751 unable_to_determine_string = ( 

752 f"{module_name} {argument}: Unable to determine if input or output file." 

753 ) 

754 log(unable_to_determine_string, out_log) 

755 if raise_exception: 

756 raise FileNotFoundError( 

757 errno.ENOENT, os.strerror(errno.ENOENT), unable_to_determine_string 

758 ) 

759 warnings.warn(unable_to_determine_string) 

760 

761 if input_file or output_files_created: 

762 not_found_error_string = ( 

763 f"Path {path} --- {module_name}: Unexisting {argument} file." 

764 ) 

765 if not Path(str(path)).exists(): 

766 log(not_found_error_string, out_log) 

767 if raise_exception: 

768 raise FileNotFoundError( 

769 errno.ENOENT, os.strerror(errno.ENOENT), not_found_error_string 

770 ) 

771 warnings.warn(not_found_error_string) 

772 # else: 

773 # if not path.parent.exists(): 

774 # not_found_dir_error_string = f"Path {path.parent} --- {module_name}: Unexisting {argument} directory." 

775 # log(not_found_dir_error_string, out_log) 

776 # if raise_exception: 

777 # raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), not_found_dir_error_string) 

778 # warnings.warn(not_found_dir_error_string) 

779 

780 if check_extensions and extension_list: 

781 no_extension_error_string = f"{module_name} {argument}: {path} has no extension. If you want to suppress this message, please set the check_extensions property to False" 

782 if not Path(str(path)).suffix: 

783 log(no_extension_error_string) 

784 warnings.warn(no_extension_error_string) 

785 else: 

786 not_valid_extension_error_string = f"{module_name} {argument}: {path} extension is not in the valid extensions list: {extension_list}. If you want to suppress this message, please set the check_extensions property to False" 

787 if not Path(str(path)).suffix[1:].lower() in extension_list: 

788 log(not_valid_extension_error_string) 

789 warnings.warn(not_valid_extension_error_string)