Coverage for biobb_common/biobb_common/tools/file_utils.py: 43%

379 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-03-13 17:26 +0000

1"""Tools to work with files 

2""" 

3import difflib 

4import functools 

5import logging 

6import os 

7import errno 

8import pathlib 

9import re 

10import shutil 

11import uuid 

12import warnings 

13import zipfile 

14from sys import platform 

15from pathlib import Path 

16import typing 

17from typing import Optional 

18import sys 

19 

20 

21def create_unique_file_path(parent_dir: str = None, extension: str = None) -> str: 

22 if not parent_dir: 

23 parent_dir = Path.cwd() 

24 if not extension: 

25 extension = "" 

26 while True: 

27 name = str(uuid.uuid4()) + extension 

28 file_path = Path.joinpath(Path(parent_dir).resolve(), name) 

29 if not file_path.exists(): 

30 return str(file_path) 

31 

32 

33def create_dir(dir_path: str) -> str: 

34 """Returns the directory **dir_path** and create it if path does not exist. 

35 

36 Args: 

37 dir_path (str): Path to the directory that will be created. 

38 

39 Returns: 

40 str: Directory dir path. 

41 """ 

42 if not Path(dir_path).exists(): 

43 Path(dir_path).mkdir(exist_ok=True, parents=True) 

44 return str(Path(dir_path)) 

45 

46 

47def create_stdin_file(intput_string: str) -> str: 

48 file_path = create_unique_file_path(extension=".stdin") 

49 with open(file_path, "w") as file_handler: 

50 file_handler.write(intput_string) 

51 return file_path 

52 

53 

54def create_unique_dir( 

55 path: str = "", 

56 prefix: str = "", 

57 number_attempts: int = 10, 

58 out_log: Optional[logging.Logger] = None, 

59) -> str: 

60 """Create a directory with a prefix + computed unique name. If the 

61 computed name collides with an existing file name it attemps 

62 **number_attempts** times to create another unique id and create 

63 the directory with the new name. 

64 

65 Args: 

66 path (str): ('') Parent path of the new directory. 

67 prefix (str): ('') String to be added before the computed unique dir name. 

68 number_attempts (int): (10) number of times creating the directory if there's a name conflict. 

69 out_log (logger): (None) Python logger object. 

70 

71 Returns: 

72 str: Directory dir path. 

73 """ 

74 new_dir = prefix + str(uuid.uuid4()) 

75 if path: 

76 new_dir = str(Path(path).joinpath(new_dir)) 

77 for i in range(number_attempts): 

78 try: 

79 oldumask = os.umask(0) 

80 Path(new_dir).mkdir(mode=0o777, parents=True, exist_ok=False) 

81 if out_log: 

82 out_log.info("%s directory successfully created" % new_dir) 

83 os.umask(oldumask) 

84 return new_dir 

85 except OSError: 

86 if out_log: 

87 out_log.info(new_dir + " Already exists") 

88 out_log.info("Retrying %i times more" % (number_attempts - i)) 

89 new_dir = prefix + str(uuid.uuid4().hex) 

90 if path: 

91 new_dir = str(Path(path).joinpath(new_dir)) 

92 if out_log: 

93 out_log.info("Trying with: " + new_dir) 

94 raise FileExistsError 

95 

96 

97def get_working_dir_path(working_dir_path: str = None, restart: bool = False) -> str: 

98 """Return the directory **working_dir_path** and create it if working_dir_path 

99 does not exist. If **working_dir_path** exists a consecutive numerical suffix 

100 is added to the end of the **working_dir_path** and is returned. 

101 

102 Args: 

103 working_dir_path (str): Path to the workflow results. 

104 restart (bool): If step result exists do not execute the step again. 

105 

106 Returns: 

107 str: Path to the workflow results directory. 

108 """ 

109 if not working_dir_path: 

110 return str(Path.cwd().resolve()) 

111 

112 working_dir_path = str(Path(working_dir_path).resolve()) 

113 

114 if (not Path(working_dir_path).exists()) or restart: 

115 return str(Path(working_dir_path)) 

116 

117 cont = 1 

118 while Path(working_dir_path).exists(): 

119 working_dir_path = ( 

120 re.split(r"_[0-9]+$", str(working_dir_path))[0] + "_" + str(cont) 

121 ) 

122 cont += 1 

123 return str(working_dir_path) 

124 

125 

126def zip_list( 

127 zip_file: str, file_list: typing.Iterable[str], out_log: logging.Logger = None 

128): 

129 """Compress all files listed in **file_list** into **zip_file** zip file. 

130 

131 Args: 

132 zip_file (str): Output compressed zip file. 

133 file_list (:obj:`list` of :obj:`str`): Input list of files to be compressed. 

134 out_log (:obj:`logging.Logger`): Input log object. 

135 """ 

136 file_list.sort() 

137 Path(zip_file).parent.mkdir(parents=True, exist_ok=True) 

138 with zipfile.ZipFile(zip_file, "w") as zip_f: 

139 inserted = [] 

140 for index, f in enumerate(file_list): 

141 base_name = Path(f).name 

142 if base_name in inserted: 

143 base_name = "file_" + str(index) + "_" + base_name 

144 inserted.append(base_name) 

145 zip_f.write(f, arcname=base_name) 

146 if out_log: 

147 out_log.info("Adding:") 

148 out_log.info(str(file_list)) 

149 out_log.info("to: " + str(Path(zip_file).resolve())) 

150 

151 

152def unzip_list( 

153 zip_file: str, dest_dir: str = None, out_log: logging.Logger = None 

154) -> typing.List[str]: 

155 """Extract all files in the zipball file and return a list containing the 

156 absolute path of the extracted files. 

157 

158 Args: 

159 zip_file (str): Input compressed zip file. 

160 dest_dir (str): Path to directory where the files will be extracted. 

161 out_log (:obj:`logging.Logger`): Input log object. 

162 

163 Returns: 

164 :obj:`list` of :obj:`str`: List of paths of the extracted files. 

165 """ 

166 with zipfile.ZipFile(zip_file, "r") as zip_f: 

167 zip_f.extractall(path=dest_dir) 

168 file_list = [str(Path(dest_dir).joinpath(f)) for f in zip_f.namelist()] 

169 

170 if out_log: 

171 out_log.info("Extracting: " + str(Path(zip_file).resolve())) 

172 out_log.info("to:") 

173 out_log.info(str(file_list)) 

174 

175 return file_list 

176 

177 

178def search_topology_files( 

179 top_file: str, out_log: logging.Logger = None 

180) -> typing.List[str]: 

181 """Search the top and itp files to create a list of the topology files 

182 

183 Args: 

184 top_file (str): Topology GROMACS top file. 

185 out_log (:obj:`logging.Logger`): Input log object. 

186 

187 Returns: 

188 :obj:`list` of :obj:`str`: List of paths of the extracted files. 

189 """ 

190 top_dir_name = str(Path(top_file).parent) 

191 file_list = [] 

192 pattern = re.compile(r"#include\s+\"(.+)\"") 

193 if Path(top_file).exists(): 

194 with open(top_file) as tf: 

195 for line in tf: 

196 include_file = pattern.match(line.strip()) 

197 if include_file: 

198 found_file = str(Path(top_dir_name).joinpath(include_file.group(1))) 

199 file_list += search_topology_files(found_file, out_log) 

200 else: 

201 if out_log: 

202 out_log.info("Ignored file %s" % top_file) 

203 return file_list 

204 return file_list + [top_file] 

205 

206 

207def zip_top( 

208 zip_file: str, 

209 top_file: str, 

210 out_log: logging.Logger = None, 

211 remove_original_files: bool = True, 

212) -> typing.List[str]: 

213 """Compress all *.itp and *.top files in the cwd into **zip_file** zip file. 

214 

215 Args: 

216 zip_file (str): Output compressed zip file. 

217 top_file (str): Topology TOP GROMACS file. 

218 out_log (:obj:`logging.Logger`): Input log object. 

219 

220 Returns: 

221 :obj:`list` of :obj:`str`: List of compressed paths. 

222 """ 

223 

224 file_list = search_topology_files(top_file, out_log) 

225 zip_list(zip_file, file_list, out_log) 

226 if remove_original_files: 

227 rm_file_list(file_list, out_log) 

228 return file_list 

229 

230 

231def unzip_top( 

232 zip_file: str, 

233 out_log: logging.Logger = None, 

234 unique_dir: typing.Union[pathlib.Path, str] = None, 

235) -> str: 

236 """Extract all files in the zip_file and copy the file extracted ".top" file to top_file. 

237 

238 Args: 

239 zip_file (str): Input topology zipball file path. 

240 out_log (:obj:`logging.Logger`): Input log object. 

241 unique_dir (str): Directory where the topology will be extracted. 

242 

243 Returns: 

244 str: Path to the extracted ".top" file. 

245 

246 """ 

247 unique_dir = unique_dir or create_unique_dir() 

248 top_list = unzip_list(zip_file, unique_dir, out_log) 

249 top_file = next(name for name in top_list if name.endswith(".top")) 

250 if out_log: 

251 out_log.info("Unzipping: ") 

252 out_log.info(zip_file) 

253 out_log.info("To: ") 

254 for file_name in top_list: 

255 out_log.info(file_name) 

256 return top_file 

257 

258 

259def get_logs_prefix(): 

260 return 4 * " " 

261 

262 

263def get_logs( 

264 path: str = None, 

265 prefix: str = None, 

266 step: str = None, 

267 can_write_console: bool = True, 

268 level: str = "INFO", 

269 light_format: bool = False, 

270) -> typing.Tuple[logging.Logger, logging.Logger]: 

271 """Get the error and and out Python Logger objects. 

272 

273 Args: 

274 path (str): (current working directory) Path to the log file directory. 

275 prefix (str): Prefix added to the name of the log file. 

276 step (str): String added between the **prefix** arg and the name of the log file. 

277 can_write_console (bool): (False) If True, show log in the execution terminal. 

278 level (str): ('INFO') Set Logging level. ['CRITICAL','ERROR','WARNING','INFO','DEBUG','NOTSET'] 

279 light_format (bool): (False) Minimalist log format. 

280 

281 Returns: 

282 :obj:`tuple` of :obj:`logging.Logger` and :obj:`logging.Logger`: Out and err Logger objects. 

283 """ 

284 prefix = prefix if prefix else "" 

285 step = step if step else "" 

286 path = path if path else str(Path.cwd()) 

287 

288 out_log_path = create_name(path=path, prefix=prefix, step=step, name="log.out") 

289 err_log_path = create_name(path=path, prefix=prefix, step=step, name="log.err") 

290 

291 # If logfile exists create a new one adding a number at the end 

292 if Path(out_log_path).exists(): 

293 name = "log.out" 

294 cont = 1 

295 while Path(out_log_path).exists(): 

296 name = name.split(".")[0].rstrip("\\/0123456789_") + str(cont) + ".out" 

297 out_log_path = create_name(path=path, prefix=prefix, step=step, name=name) 

298 cont += 1 

299 if Path(err_log_path).exists(): 

300 name = "log.err" 

301 cont = 1 

302 while Path(err_log_path).exists(): 

303 name = name.split(".")[0].rstrip("\\/0123456789_") + str(cont) + ".err" 

304 err_log_path = create_name(path=path, prefix=prefix, step=step, name=name) 

305 cont += 1 

306 

307 # Create dir if it not exists 

308 create_dir(str(Path(out_log_path).resolve().parent)) 

309 

310 # Create logging format 

311 logFormatter = logging.Formatter( 

312 "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s" 

313 ) 

314 if light_format: 

315 logFormatter = logging.Formatter("%(asctime)s %(message)s", "%H:%M:%S") 

316 # Create logging objects 

317 out_Logger = logging.getLogger(out_log_path) 

318 err_Logger = logging.getLogger(err_log_path) 

319 

320 # Create FileHandler 

321 out_fileHandler = logging.FileHandler( 

322 out_log_path, mode="a", encoding=None, delay=False 

323 ) 

324 err_fileHandler = logging.FileHandler( 

325 err_log_path, mode="a", encoding=None, delay=False 

326 ) 

327 

328 # Asign format to FileHandler 

329 out_fileHandler.setFormatter(logFormatter) 

330 err_fileHandler.setFormatter(logFormatter) 

331 

332 # Assign FileHandler to logging object 

333 if not len(out_Logger.handlers): 

334 out_Logger.addHandler(out_fileHandler) 

335 err_Logger.addHandler(err_fileHandler) 

336 

337 # Create consoleHandler 

338 consoleHandler = logging.StreamHandler(stream=sys.stdout) 

339 # Assign format to consoleHandler 

340 consoleHandler.setFormatter(logFormatter) 

341 

342 # Assign consoleHandler to logging objects as aditional output 

343 if can_write_console and len(out_Logger.handlers) < 2: 

344 out_Logger.addHandler(consoleHandler) 

345 err_Logger.addHandler(consoleHandler) 

346 

347 # Set logging level level 

348 out_Logger.setLevel(level) 

349 err_Logger.setLevel(level) 

350 return out_Logger, err_Logger 

351 

352 

353def launchlogger(func): 

354 @functools.wraps(func) 

355 def wrapper_log(*args, **kwargs): 

356 args[0].out_log, args[0].err_log = get_logs( 

357 path=args[0].path, 

358 prefix=args[0].prefix, 

359 step=args[0].step, 

360 can_write_console=args[0].can_write_console_log, 

361 ) 

362 value = func(*args, **kwargs) 

363 handlers = args[0].out_log.handlers[ 

364 : 

365 ] # Create a copy [:] of the handler list to be able to modify it while we are iterating 

366 for handler in handlers: 

367 handler.close() 

368 args[0].out_log.removeHandler(handler) 

369 handlers = args[0].err_log.handlers[ 

370 : 

371 ] # Create a copy [:] of the handler list to be able to modify it while we are iterating 

372 for handler in handlers: 

373 handler.close() 

374 args[0].err_log.removeHandler(handler) 

375 return value 

376 

377 return wrapper_log 

378 

379 

380def log(string: str, local_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None): 

381 """Checks if log exists 

382 

383 Args: 

384 string (str): Message to log. 

385 local_log (:obj:`logging.Logger`): local log object. 

386 global_log (:obj:`logging.Logger`): global log object. 

387 

388 """ 

389 if local_log: 

390 local_log.info(string) 

391 if global_log: 

392 global_log.info(get_logs_prefix() + string) 

393 

394 

395def human_readable_time(time_ps: int) -> str: 

396 """Transform **time_ps** to a human readable string. 

397 

398 Args: 

399 time_ps (int): Time in pico seconds. 

400 

401 Returns: 

402 str: Human readable time. 

403 """ 

404 time_units = [ 

405 "femto seconds", 

406 "pico seconds", 

407 "nano seconds", 

408 "micro seconds", 

409 "mili seconds", 

410 ] 

411 t = time_ps * 1000 

412 for tu in time_units: 

413 if t < 1000: 

414 return str(t) + " " + tu 

415 

416 t /= 1000 

417 return str(time_ps) 

418 

419 

420def check_properties(obj: object, properties: dict, reserved_properties: dict = None): 

421 if not reserved_properties: 

422 reserved_properties = [] 

423 reserved_properties = set(["system", "working_dir_path"] + reserved_properties) 

424 error_properties = set( 

425 [prop for prop in properties.keys() if prop not in obj.__dict__.keys()] 

426 ) 

427 error_properties -= reserved_properties 

428 for error_property in error_properties: 

429 close_property = difflib.get_close_matches( 

430 error_property, obj.__dict__.keys(), n=1, cutoff=0.01 

431 ) 

432 close_property = close_property[0] if close_property else "" 

433 warnings.warn( 

434 "Warning: %s is not a recognized property. The most similar property is: %s" 

435 % (error_property, close_property) 

436 ) 

437 

438 

439def create_name( 

440 path: str = None, prefix: str = None, step: str = None, name: str = None 

441) -> str: 

442 """Return file name. 

443 

444 Args: 

445 path (str): Path to the file directory. 

446 prefix (str): Prefix added to the name of the file. 

447 step (str): String added between the **prefix** arg and the **name** arg of the file. 

448 name (str): Name of the file. 

449 

450 Returns: 

451 str: Composed file name. 

452 """ 

453 name = "" if name is None else name.strip() 

454 if step: 

455 if name: 

456 name = step + "_" + name 

457 else: 

458 name = step 

459 if prefix: 

460 prefix = prefix.replace("/", "_") 

461 if name: 

462 name = prefix + "_" + name 

463 else: 

464 name = prefix 

465 if path: 

466 if name: 

467 name = str(Path(path).joinpath(name)) 

468 else: 

469 name = path 

470 return name 

471 

472 

473def write_failed_output(file_name: str): 

474 with open(file_name, "w") as f: 

475 f.write("Error\n") 

476 

477 

478def rm(file_name: str) -> str: 

479 try: 

480 file_path = pathlib.Path(file_name) 

481 if file_path.exists(): 

482 if file_path.is_dir(): 

483 shutil.rmtree(file_name) 

484 return file_name 

485 if file_path.is_file(): 

486 Path(file_name).unlink() 

487 return file_name 

488 except Exception: 

489 pass 

490 return None 

491 

492 

493def rm_file_list( 

494 file_list: typing.Iterable[str], out_log: logging.Logger = None 

495) -> typing.List[str]: 

496 removed_files = [f for f in file_list if rm(f)] 

497 if out_log: 

498 log("Removed: %s" % str(removed_files), out_log) 

499 return removed_files 

500 

501 

502def check_complete_files(output_file_list: typing.Iterable[str]) -> bool: 

503 for output_file in filter(None, output_file_list): 

504 if not (Path(output_file).is_file() and Path(output_file).stat().st_size > 0): 

505 return False 

506 return True 

507 

508 

509def copy_to_container( 

510 container_path: str, 

511 container_volume_path: str, 

512 io_dict: typing.Mapping, 

513 out_log: logging.Logger = None, 

514) -> dict: 

515 if not container_path: 

516 return io_dict 

517 

518 unique_dir = str(Path(create_unique_dir()).resolve()) 

519 container_io_dict = {"in": {}, "out": {}, "unique_dir": unique_dir} 

520 

521 # IN files COPY and assign INTERNAL PATH 

522 for file_ref, file_path in io_dict["in"].items(): 

523 if file_path: 

524 if Path(file_path).exists(): 

525 shutil.copy2(file_path, unique_dir) 

526 log(f"Copy: {file_path} to {unique_dir}") 

527 container_io_dict["in"][file_ref] = str( 

528 Path(container_volume_path).joinpath(Path(file_path).name) 

529 ) 

530 else: 

531 # Default files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro) 

532 container_io_dict["in"][file_ref] = file_path 

533 

534 # OUT files assign INTERNAL PATH 

535 for file_ref, file_path in io_dict["out"].items(): 

536 if file_path: 

537 container_io_dict["out"][file_ref] = str( 

538 Path(container_volume_path).joinpath(Path(file_path).name) 

539 ) 

540 

541 return container_io_dict 

542 

543 

544def copy_to_host(container_path: str, container_io_dict: dict, io_dict: dict): 

545 if not container_path: 

546 return 

547 

548 # OUT files COPY 

549 for file_ref, file_path in container_io_dict["out"].items(): 

550 if file_path: 

551 container_file_path = str( 

552 Path(container_io_dict["unique_dir"]).joinpath(Path(file_path).name) 

553 ) 

554 if Path(container_file_path).exists(): 

555 shutil.copy2(container_file_path, io_dict["out"][file_ref]) 

556 

557 

558def create_cmd_line( 

559 cmd: typing.Iterable[str], 

560 container_path: str = "", 

561 host_volume: str = None, 

562 container_volume: str = None, 

563 container_working_dir: str = None, 

564 container_user_uid: str = None, 

565 container_shell_path: str = None, 

566 container_image: str = None, 

567 out_log: logging.Logger = None, 

568 global_log: logging.Logger = None, 

569) -> typing.List[str]: 

570 container_path = container_path or "" 

571 if container_path.endswith("singularity"): 

572 log("Using Singularity image %s" % container_image, out_log, global_log) 

573 if not Path(container_image).exists(): 

574 log( 

575 f"{container_image} does not exist trying to pull it", 

576 out_log, 

577 global_log, 

578 ) 

579 container_image_name = str(Path(container_image).with_suffix(".sif").name) 

580 singularity_pull_cmd = [ 

581 container_path, 

582 "pull", 

583 "--name", 

584 container_image_name, 

585 container_image, 

586 ] 

587 try: 

588 from biobb_common.command_wrapper import cmd_wrapper 

589 

590 cmd_wrapper.CmdWrapper(singularity_pull_cmd, out_log).launch() 

591 if Path(container_image_name).exists(): 

592 container_image = container_image_name 

593 else: 

594 raise FileNotFoundError 

595 except FileNotFoundError: 

596 log(f"{' '.join(singularity_pull_cmd)} not found", out_log, global_log) 

597 raise FileNotFoundError 

598 singularity_cmd = [ 

599 container_path, 

600 "exec", 

601 "-e", 

602 "--bind", 

603 host_volume + ":" + container_volume, 

604 container_image, 

605 ] 

606 # If we are working on a mac remove -e option because is still no available 

607 if platform == "darwin": 

608 if "-e" in singularity_cmd: 

609 singularity_cmd.remove("-e") 

610 

611 cmd = ['"' + " ".join(cmd) + '"'] 

612 singularity_cmd.extend([container_shell_path, "-c"]) 

613 return singularity_cmd + cmd 

614 

615 elif container_path.endswith("docker"): 

616 log("Using Docker image %s" % container_image, out_log, global_log) 

617 docker_cmd = [container_path, "run"] 

618 if container_working_dir: 

619 docker_cmd.append("-w") 

620 docker_cmd.append(container_working_dir) 

621 if container_volume: 

622 docker_cmd.append("-v") 

623 docker_cmd.append(host_volume + ":" + container_volume) 

624 if container_user_uid: 

625 docker_cmd.append("--user") 

626 docker_cmd.append(container_user_uid) 

627 

628 docker_cmd.append(container_image) 

629 

630 cmd = ['"' + " ".join(cmd) + '"'] 

631 docker_cmd.extend([container_shell_path, "-c"]) 

632 return docker_cmd + cmd 

633 

634 elif container_path.endswith("pcocc"): 

635 # pcocc run -I racov56:pmx cli.py mutate -h 

636 log("Using pcocc image %s" % container_image, out_log, global_log) 

637 pcocc_cmd = [container_path, "run", "-I", container_image] 

638 if container_working_dir: 

639 pcocc_cmd.append("--cwd") 

640 pcocc_cmd.append(container_working_dir) 

641 if container_volume: 

642 pcocc_cmd.append("--mount") 

643 pcocc_cmd.append(host_volume + ":" + container_volume) 

644 if container_user_uid: 

645 pcocc_cmd.append("--user") 

646 pcocc_cmd.append(container_user_uid) 

647 

648 cmd = ['\\"' + " ".join(cmd) + '\\"'] 

649 pcocc_cmd.extend([container_shell_path, "-c"]) 

650 return pcocc_cmd + cmd 

651 

652 else: 

653 # log('Not using any container', out_log, global_log) 

654 return cmd 

655 

656 

657def get_doc_dicts(doc: Optional[str]): 

658 regex_argument = re.compile( 

659 r"(?P<argument>\w*)\ *(?:\()(?P<type>\w*)(?:\)):?\ *(?P<optional>\(\w*\):)?\ *(?P<description>.*?)(?:\.)\ *(?:File type:\ *)(?P<input_output>\w+)\.\ *(\`(?:.+)\<(?P<sample_file>.*?)\>\`\_\.)?\ *(?:Accepted formats:\ *)(?P<formats>.+)(?:\.)?" 

660 ) 

661 regex_argument_formats = re.compile( 

662 r"(?P<extension>\w*)\ *(\(\ *)\ *edam\ *:\ *(?P<edam>\w*)" 

663 ) 

664 regex_property = re.compile( 

665 r"(?:\*\ *\*\*)(?P<property>.*?)(?:\*\*)\ *(?:\(\*)(?P<type>\w*)(?:\*\))\ *\-\ ?(?:\()(?P<default_value>.*?)(?:\))\ *(?:(?:\[)(?P<wf_property>WF property)(?:\]))?\ *(?:(?:\[)(?P<range_start>[\-]?\d+(?:\.\d+)?)\~(?P<range_stop>[\-]?\d+(?:\.\d+)?)(?:\|)?(?P<range_step>\d+(?:\.\d+)?)?(?:\]))?\ *(?:(?:\[)(.*?)(?:\]))?\ *(?P<description>.*)" 

666 ) 

667 regex_property_value = re.compile( 

668 r"(?P<value>\w*)\ *(?:(?:\()(?P<description>.*?)?(?:\)))?" 

669 ) 

670 

671 doc_lines = list( 

672 map(str.strip, filter(lambda line: line.strip(), doc.splitlines())) 

673 ) 

674 args_index = doc_lines.index( 

675 next(filter(lambda line: line.lower().startswith("args"), doc_lines)) 

676 ) 

677 properties_index = doc_lines.index( 

678 next(filter(lambda line: line.lower().startswith("properties"), doc_lines)) 

679 ) 

680 examples_index = doc_lines.index( 

681 next(filter(lambda line: line.lower().startswith("examples"), doc_lines)) 

682 ) 

683 arguments_lines_list = doc_lines[args_index + 1 : properties_index] 

684 properties_lines_list = doc_lines[properties_index + 1 : examples_index] 

685 

686 doc_arguments_dict = {} 

687 for argument_line in arguments_lines_list: 

688 argument_dict = regex_argument.match(argument_line).groupdict() 

689 argument_dict["formats"] = { 

690 match.group("extension"): match.group("edam") 

691 for match in regex_argument_formats.finditer(argument_dict["formats"]) 

692 } 

693 doc_arguments_dict[argument_dict.pop("argument")] = argument_dict 

694 

695 doc_properties_dict = {} 

696 for property_line in properties_lines_list: 

697 property_dict = regex_property.match(property_line).groupdict() 

698 property_dict["values"] = None 

699 if "Values:" in property_dict["description"]: 

700 property_dict["description"], property_dict["values"] = property_dict[ 

701 "description" 

702 ].split("Values:") 

703 property_dict["values"] = { 

704 match.group("value"): match.group("description") 

705 for match in regex_property_value.finditer(property_dict["values"]) 

706 if match.group("value") 

707 } 

708 doc_properties_dict[property_dict.pop("property")] = property_dict 

709 

710 return doc_arguments_dict, doc_properties_dict 

711 

712 

713def check_argument( 

714 path: Optional[pathlib.Path], 

715 argument: str, 

716 optional: bool, 

717 module_name: str, 

718 input_output: Optional[str] = None, 

719 output_files_created: bool = False, 

720 extension_list: Optional[typing.List[str]] = None, 

721 raise_exception: bool = True, 

722 check_extensions: bool = True, 

723 out_log: Optional[logging.Logger] = None, 

724) -> None: 

725 if optional and not path: 

726 return None 

727 

728 if input_output in ["in", "input"]: 

729 input_file = True 

730 elif input_output in ["out", "output"]: 

731 input_file = False 

732 else: 

733 unable_to_determine_string = ( 

734 f"{module_name} {argument}: Unable to determine if input or output file." 

735 ) 

736 log(unable_to_determine_string, out_log) 

737 if raise_exception: 

738 raise FileNotFoundError( 

739 errno.ENOENT, os.strerror(errno.ENOENT), unable_to_determine_string 

740 ) 

741 warnings.warn(unable_to_determine_string) 

742 

743 if input_file or output_files_created: 

744 not_found_error_string = ( 

745 f"Path {path} --- {module_name}: Unexisting {argument} file." 

746 ) 

747 if not path.exists(): 

748 log(not_found_error_string, out_log) 

749 if raise_exception: 

750 raise FileNotFoundError( 

751 errno.ENOENT, os.strerror(errno.ENOENT), not_found_error_string 

752 ) 

753 warnings.warn(not_found_error_string) 

754 # else: 

755 # if not path.parent.exists(): 

756 # not_found_dir_error_string = f"Path {path.parent} --- {module_name}: Unexisting {argument} directory." 

757 # log(not_found_dir_error_string, out_log) 

758 # if raise_exception: 

759 # raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), not_found_dir_error_string) 

760 # warnings.warn(not_found_dir_error_string) 

761 

762 if check_extensions and extension_list: 

763 no_extension_error_string = f"{module_name} {argument}: {path} has no extension. If you want to suppress this message, please set the check_extensions property to False" 

764 if not path.suffix: 

765 log(no_extension_error_string) 

766 warnings.warn(no_extension_error_string) 

767 else: 

768 not_valid_extension_error_string = f"{module_name} {argument}: {path} extension is not in the valid extensions list: {extension_list}. If you want to suppress this message, please set the check_extensions property to False" 

769 if not path.suffix[1:].lower() in extension_list: 

770 log(not_valid_extension_error_string) 

771 warnings.warn(not_valid_extension_error_string)