Coverage for biobb_common / biobb_common / tools / file_utils.py: 43%

410 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 13:18 +0000

1"""Tools to work with files 

2""" 

3import difflib 

4import functools 

5import logging 

6import os 

7import errno 

8import pathlib 

9import re 

10import shutil 

11import uuid 

12import warnings 

13import zipfile 

14from sys import platform 

15from pathlib import Path 

16import typing 

17from typing import Optional, Union 

18import sys 

19from contextlib import contextmanager 

20 

21 

22def create_unique_file_path(parent_dir: Optional[Union[str, Path]] = None, extension: Optional[Union[str, Path]] = None) -> str: 

23 if not parent_dir: 

24 parent_dir = Path.cwd() 

25 if not extension: 

26 extension = "" 

27 while True: 

28 name = f"{uuid.uuid4()}{extension}" 

29 file_path = Path.joinpath(Path(parent_dir).resolve(), name) 

30 if not file_path.exists(): 

31 return str(file_path) 

32 

33 

34def create_dir(dir_path: str) -> str: 

35 """Returns the directory **dir_path** and create it if path does not exist. 

36 

37 Args: 

38 dir_path (str): Path to the directory that will be created. 

39 

40 Returns: 

41 str: Directory dir path. 

42 """ 

43 if not Path(dir_path).exists(): 

44 Path(dir_path).mkdir(exist_ok=True, parents=True) 

45 return str(Path(dir_path)) 

46 

47 

48def create_stdin_file(intput_string: str) -> str: 

49 file_path = create_unique_file_path(extension=".stdin") 

50 with open(file_path, "w") as file_handler: 

51 file_handler.write(intput_string) 

52 return file_path 

53 

54 

55def create_unique_dir( 

56 path: str = "", 

57 prefix: str = "", 

58 number_attempts: int = 10, 

59 out_log: Optional[logging.Logger] = None, 

60) -> str: 

61 """Create a directory with a prefix + computed unique name. If the 

62 computed name collides with an existing file name it attemps 

63 **number_attempts** times to create another unique id and create 

64 the directory with the new name. 

65 

66 Args: 

67 path (str): ('') Parent path of the new directory. 

68 prefix (str): ('') String to be added before the computed unique dir name. 

69 number_attempts (int): (10) number of times creating the directory if there's a name conflict. 

70 out_log (logger): (None) Python logger object. 

71 

72 Returns: 

73 str: Directory dir path. 

74 """ 

75 new_dir = prefix + str(uuid.uuid4()) 

76 if path: 

77 new_dir = str(Path(path).joinpath(new_dir)) 

78 for i in range(number_attempts): 

79 try: 

80 oldumask = os.umask(0) 

81 Path(new_dir).mkdir(mode=0o777, parents=True, exist_ok=False) 

82 if out_log: 

83 out_log.info("Directory successfully created: %s" % new_dir) 

84 os.umask(oldumask) 

85 return new_dir 

86 except OSError: 

87 if out_log: 

88 out_log.info(new_dir + " Already exists") 

89 out_log.info("Retrying %i times more" % (number_attempts - i)) 

90 new_dir = prefix + str(uuid.uuid4().hex) 

91 if path: 

92 new_dir = str(Path(path).joinpath(new_dir)) 

93 if out_log: 

94 out_log.info("Trying with: " + new_dir) 

95 raise FileExistsError 

96 

97 

98def get_working_dir_path(working_dir_path: Optional[Union[str, Path]] = None, restart: bool = False) -> str: 

99 """Return the directory **working_dir_path** and create it if working_dir_path 

100 does not exist. If **working_dir_path** exists a consecutive numerical suffix 

101 is added to the end of the **working_dir_path** and is returned. 

102 

103 Args: 

104 working_dir_path (str): Path to the workflow results. 

105 restart (bool): If step result exists do not execute the step again. 

106 

107 Returns: 

108 str: Path to the workflow results directory. 

109 """ 

110 if not working_dir_path: 

111 return str(Path.cwd().resolve()) 

112 

113 working_dir_path = str(Path(working_dir_path).resolve()) 

114 

115 if (not Path(working_dir_path).exists()) or restart: 

116 return str(Path(working_dir_path)) 

117 

118 cont = 1 

119 while Path(str(working_dir_path)).exists(): 

120 working_dir_path = ( 

121 re.split(r"_[0-9]+$", str(working_dir_path))[0] + "_" + str(cont) 

122 ) 

123 cont += 1 

124 return str(working_dir_path) 

125 

126 

127def zip_list( 

128 zip_file: Union[str, Path], file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None 

129): 

130 """Compress all files listed in **file_list** into **zip_file** zip file. 

131 

132 Args: 

133 zip_file (str): Output compressed zip file. 

134 file_list (:obj:`list` of :obj:`str`): Input list of files to be compressed. 

135 out_log (:obj:`logging.Logger`): Input log object. 

136 """ 

137 file_list = list(file_list) 

138 file_list.sort() 

139 Path(zip_file).parent.mkdir(parents=True, exist_ok=True) 

140 with zipfile.ZipFile(zip_file, "w") as zip_f: 

141 inserted = [] 

142 for index, f in enumerate(file_list): 

143 base_name = Path(f).name 

144 if base_name in inserted: 

145 base_name = "file_" + str(index) + "_" + base_name 

146 inserted.append(base_name) 

147 zip_f.write(f, arcname=base_name) 

148 if out_log: 

149 out_log.info("Adding:") 

150 out_log.info(list(map(lambda x: str(Path(x).resolve().relative_to(Path.cwd())), file_list))) 

151 out_log.info("to: " + str(Path(zip_file).resolve())) 

152 

153 

154def unzip_list( 

155 zip_file: Union[str, Path], dest_dir: Optional[Union[str, Path]] = None, out_log: Optional[logging.Logger] = None 

156) -> list[str]: 

157 """Extract all files in the zipball file and return a list containing the 

158 absolute path of the extracted files. 

159 

160 Args: 

161 zip_file (str): Input compressed zip file. 

162 dest_dir (str): Path to directory where the files will be extracted. 

163 out_log (:obj:`logging.Logger`): Input log object. 

164 

165 Returns: 

166 :obj:`list` of :obj:`str`: list of paths of the extracted files. 

167 """ 

168 with zipfile.ZipFile(zip_file, "r") as zip_f: 

169 zip_f.extractall(path=dest_dir) 

170 file_list = [str(Path(str(dest_dir)).joinpath(f)) for f in zip_f.namelist()] 

171 

172 if out_log: 

173 out_log.info("Extracting: " + str(Path(zip_file).resolve())) 

174 out_log.info("to:") 

175 out_log.info(str(file_list)) 

176 

177 return file_list 

178 

179 

180def search_topology_files( 

181 top_file: Union[str, Path], out_log: Optional[logging.Logger] = None 

182) -> list[str]: 

183 """Search the top and itp files to create a list of the topology files 

184 

185 Args: 

186 top_file (str): Topology GROMACS top file. 

187 out_log (:obj:`logging.Logger`): Input log object. 

188 

189 Returns: 

190 :obj:`list` of :obj:`str`: list of paths of the extracted files. 

191 """ 

192 top_dir_name = str(Path(top_file).parent) 

193 file_list = [] 

194 pattern = re.compile(r"#include\s+\"(.+)\"") 

195 if Path(top_file).exists(): 

196 with open(top_file) as tf: 

197 for line in tf: 

198 include_file = pattern.match(line.strip()) 

199 if include_file: 

200 found_file = str(Path(top_dir_name).joinpath(include_file.group(1))) 

201 file_list += search_topology_files(found_file, out_log) 

202 else: 

203 if out_log: 

204 out_log.info("Ignored file %s" % top_file) 

205 return file_list 

206 return file_list + [str(top_file)] 

207 

208 

209def zip_top( 

210 zip_file: Union[str, Path], 

211 top_file: Union[str, Path], 

212 out_log: Optional[logging.Logger] = None, 

213 remove_original_files: bool = True, 

214) -> list[str]: 

215 """Compress all *.itp and *.top files in the cwd into **zip_file** zip file. 

216 

217 Args: 

218 zip_file (str): Output compressed zip file. 

219 top_file (str): Topology TOP GROMACS file. 

220 out_log (:obj:`logging.Logger`): Input log object. 

221 

222 Returns: 

223 :obj:`list` of :obj:`str`: list of compressed paths. 

224 """ 

225 

226 file_list = search_topology_files(top_file, out_log) 

227 zip_list(zip_file, file_list, out_log) 

228 if remove_original_files: 

229 rm_file_list(file_list, out_log) 

230 return file_list 

231 

232 

233def unzip_top( 

234 zip_file: Union[str, Path], 

235 out_log: Optional[logging.Logger] = None, 

236 unique_dir: Optional[Union[pathlib.Path, str]] = None, 

237) -> str: 

238 """Extract all files in the zip_file and copy the file extracted ".top" file to top_file. 

239 

240 Args: 

241 zip_file (str): Input topology zipball file path. 

242 out_log (:obj:`logging.Logger`): Input log object. 

243 unique_dir (str): Directory where the topology will be extracted. 

244 

245 Returns: 

246 str: Path to the extracted ".top" file. 

247 

248 """ 

249 unique_dir = unique_dir or create_unique_dir() 

250 top_list = unzip_list(zip_file, unique_dir, out_log) 

251 top_file = next(name for name in top_list if name.endswith(".top")) 

252 if out_log: 

253 out_log.info("Unzipping: ") 

254 out_log.info(zip_file) 

255 out_log.info("To: ") 

256 for file_name in top_list: 

257 out_log.info(file_name) 

258 return top_file 

259 

260 

261def get_logs_prefix(): 

262 return 4 * " " 

263 

264 

265def create_incremental_name(path: Union[Path, str]) -> str: 

266 """Increment the name of the file by adding a number at the end. 

267 

268 Args: 

269 path (str): path of the file. 

270 

271 Returns: 

272 str: Incremented name of the file. 

273 """ 

274 if (path_obj := Path(path)).exists(): 

275 cont = 1 

276 while path_obj.exists(): 

277 new_name = f'{path_obj.stem.rstrip("0123456789_")}_{cont}{path_obj.suffix}' 

278 path_obj = path_obj.with_name(new_name) 

279 cont += 1 

280 return str(path_obj) 

281 

282 

283def get_logs( 

284 path: Optional[Union[str, Path]] = None, 

285 prefix: Optional[str] = None, 

286 step: Optional[str] = None, 

287 can_write_console: bool = True, 

288 can_write_file: bool = True, 

289 out_log_path: Optional[Union[str, Path]] = None, 

290 err_log_path: Optional[Union[str, Path]] = None, 

291 level: str = "INFO", 

292 light_format: bool = False, 

293) -> tuple[logging.Logger, logging.Logger]: 

294 """Get the error and and out Python Logger objects. 

295 

296 Args: 

297 path (str): (current working directory) Path to the log file directory. 

298 prefix (str): Prefix added to the name of the log file. 

299 step (str): String added between the **prefix** arg and the name of the log file. 

300 can_write_console (bool): (True) If True, show log in the execution terminal. 

301 can_write_file (bool): (True) If True, write log to the log files. 

302 out_log_path (str): (None) Path to the out log file. 

303 err_log_path (str): (None) Path to the err log file. 

304 level (str): ('INFO') Set Logging level. ['CRITICAL','ERROR','WARNING','INFO','DEBUG','NOTSET'] 

305 light_format (bool): (False) Minimalist log format. 

306 

307 Returns: 

308 :obj:`tuple` of :obj:`logging.Logger` and :obj:`logging.Logger`: Out and err Logger objects. 

309 """ 

310 out_log_path = out_log_path or "log.out" 

311 err_log_path = err_log_path or "log.err" 

312 # If paths are not absolute create and return them 

313 if not Path(out_log_path).is_absolute(): 

314 out_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(out_log_path))) 

315 if not Path(err_log_path).is_absolute(): 

316 err_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(err_log_path))) 

317 # Create logging objects 

318 out_Logger = logging.getLogger(str(out_log_path)) 

319 err_Logger = logging.getLogger(str(err_log_path)) 

320 

321 # Create logging format 

322 logFormatter = logging.Formatter( 

323 "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s" 

324 ) 

325 if light_format: 

326 logFormatter = logging.Formatter("%(asctime)s %(message)s", "%H:%M:%S") 

327 

328 if can_write_file: 

329 prefix = prefix if prefix else "" 

330 step = step if step else "" 

331 path = path if path else str(Path.cwd()) 

332 

333 # Create dir if it not exists 

334 create_dir(str(Path(out_log_path).resolve().parent)) 

335 

336 # Create FileHandler 

337 out_fileHandler = logging.FileHandler(out_log_path, mode="a", encoding=None, delay=True) 

338 err_fileHandler = logging.FileHandler(err_log_path, mode="a", encoding=None, delay=True) 

339 # Asign format to FileHandler 

340 out_fileHandler.setFormatter(logFormatter) 

341 err_fileHandler.setFormatter(logFormatter) 

342 

343 # Assign FileHandler to logging object 

344 if not len(out_Logger.handlers): 

345 out_Logger.addHandler(out_fileHandler) 

346 err_Logger.addHandler(err_fileHandler) 

347 

348 if can_write_console: 

349 console_out = logging.StreamHandler(stream=sys.stdout) 

350 console_err = logging.StreamHandler(stream=sys.stderr) 

351 console_out.setFormatter(logFormatter) 

352 console_err.setFormatter(logFormatter) 

353 # Assign consoleHandler to logging objects as aditional output 

354 if len(out_Logger.handlers) < 2: 

355 out_Logger.addHandler(console_out) 

356 err_Logger.addHandler(console_err) 

357 

358 # Set logging level level 

359 out_Logger.setLevel(level) 

360 err_Logger.setLevel(level) 

361 

362 return out_Logger, err_Logger 

363 

364 

365def launchlogger(func): 

366 """Decorator to create the out_log and err_log""" 

367 @functools.wraps(func) 

368 def wrapper_log(*args, **kwargs): 

369 create_dir(create_name(path=args[0].path)) 

370 if args[0].disable_logs: 

371 return func(*args, **kwargs) 

372 

373 # Create local out_log and err_log 

374 args[0].out_log, args[0].err_log = get_logs( 

375 path=args[0].path, 

376 prefix=args[0].prefix, 

377 step=args[0].step, 

378 can_write_console=args[0].can_write_console_log, 

379 can_write_file=args[0].can_write_file_log, 

380 out_log_path=args[0].out_log_path, 

381 err_log_path=args[0].err_log_path 

382 ) 

383 

384 # Run the function and capture its return value 

385 value = func(*args, **kwargs) 

386 

387 # Close and remove handlers from out_log and err_log 

388 for log in [args[0].out_log, args[0].err_log]: 

389 # Create a copy [:] of the handler list to be able to modify it while we are iterating 

390 handlers = log.handlers[:] 

391 for handler in handlers: 

392 handler.close() 

393 log.removeHandler(handler) 

394 

395 return value 

396 

397 return wrapper_log 

398 

399 

400def log(string: str, local_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None): 

401 """Checks if log exists 

402 

403 Args: 

404 string (str): Message to log. 

405 local_log (:obj:`logging.Logger`): local log object. 

406 global_log (:obj:`logging.Logger`): global log object. 

407 

408 """ 

409 if local_log: 

410 local_log.info(string) 

411 if global_log: 

412 global_log.info(get_logs_prefix() + string) 

413 

414 

415def human_readable_time(time_ps: int) -> str: 

416 """Transform **time_ps** to a human readable string. 

417 

418 Args: 

419 time_ps (int): Time in pico seconds. 

420 

421 Returns: 

422 str: Human readable time. 

423 """ 

424 time_units = [ 

425 "femto seconds", 

426 "pico seconds", 

427 "nano seconds", 

428 "micro seconds", 

429 "mili seconds", 

430 ] 

431 t = time_ps * 1000 

432 for tu in time_units: 

433 if t < 1000: 

434 return str(t) + " " + tu 

435 

436 t = int(t/1000) 

437 return str(time_ps) 

438 

439 

440def check_properties(obj: object, properties: dict, reserved_properties: Optional[list[str]] = None): 

441 if not reserved_properties: 

442 reserved_properties = [] 

443 error_properties = set( 

444 [prop for prop in properties.keys() if prop not in obj.__dict__.keys()] 

445 ) 

446 error_properties -= set(["system", "working_dir_path"] + list(reserved_properties)) 

447 for error_property in error_properties: 

448 close_property_list = difflib.get_close_matches( 

449 error_property, obj.__dict__.keys(), n=1, cutoff=0.01 

450 ) 

451 close_property = close_property_list[0] if close_property_list else "" 

452 warnings.warn( 

453 "Warning: %s is not a recognized property. The most similar property is: %s" 

454 % (error_property, close_property) 

455 ) 

456 

457 

458def create_name( 

459 path: Optional[Union[str, Path]] = None, prefix: Optional[str] = None, 

460 step: Optional[str] = None, name: Optional[str] = None 

461) -> str: 

462 """Return file name. 

463 

464 Args: 

465 path (str): Path to the file directory. 

466 prefix (str): Prefix added to the name of the file. 

467 step (str): String added between the **prefix** arg and the **name** arg of the file. 

468 name (str): Name of the file. 

469 

470 Returns: 

471 str: Composed file name. 

472 """ 

473 name = "" if name is None else name.strip() 

474 if step: 

475 if name: 

476 name = step + "_" + name 

477 else: 

478 name = step 

479 if prefix: 

480 prefix = prefix.replace("/", "_") 

481 if name: 

482 name = prefix + "_" + name 

483 else: 

484 name = prefix 

485 if path: 

486 if name: 

487 name = str(Path(path).joinpath(name)) 

488 else: 

489 name = str(path) 

490 return name 

491 

492 

493def write_failed_output(file_name: str): 

494 with open(file_name, "w") as f: 

495 f.write("Error\n") 

496 

497 

498def rm(file_name: Union[str, Path]) -> Optional[Union[str, Path]]: 

499 try: 

500 file_path = pathlib.Path(file_name) 

501 if file_path.exists(): 

502 if file_path.is_dir(): 

503 shutil.rmtree(file_name) 

504 return file_name 

505 if file_path.is_file(): 

506 Path(file_name).unlink() 

507 return file_name 

508 except Exception: 

509 pass 

510 return None 

511 

512 

513def rm_file_list( 

514 file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None 

515) -> list[str]: 

516 removed_files = [str(f) for f in file_list if rm(f)] 

517 if len(removed_files) > 0 and out_log: 

518 log("Removed: %s" % str(removed_files), out_log) 

519 return removed_files 

520 

521 

522def check_complete_files(output_file_list: list[Union[str, Path]]) -> bool: 

523 for output_file in filter(None, output_file_list): 

524 output_file = Path(str(output_file)) 

525 file_exists = output_file.is_file() and output_file.stat().st_size > 0 

526 dir_exists = output_file.is_dir() and any(output_file.iterdir()) 

527 if not file_exists and not dir_exists: 

528 return False 

529 return True 

530 

531 

532def copytree_new_files_only(source, destination): 

533 """ 

534 Recursively copies files from source to destination only if they don't 

535 already exist in the destination. 

536 """ 

537 if not os.path.exists(destination): 

538 os.makedirs(destination) 

539 

540 for dirpath, dirnames, filenames in os.walk(source): 

541 # Create a corresponding directory in the destination 

542 relative_path = os.path.relpath(dirpath, source) 

543 dest_dir = os.path.join(destination, relative_path) 

544 if not os.path.exists(dest_dir): 

545 os.makedirs(dest_dir) 

546 

547 # Copy files that do not exist or have newer modification times 

548 for filename in filenames: 

549 src_file_path = os.path.join(dirpath, filename) 

550 dest_file_path = os.path.join(dest_dir, filename) 

551 

552 if not os.path.exists(dest_file_path) or os.path.getmtime(src_file_path) > os.path.getmtime(dest_file_path): 

553 shutil.copy2(src_file_path, dest_file_path) 

554 

555 

556def copy_to_container(container_path: Optional[Union[str, Path]], container_volume_path: str, 

557 io_dict: dict, out_log: Optional[logging.Logger] = None) -> dict: 

558 if not container_path: 

559 return io_dict 

560 

561 unique_dir = str(Path(create_unique_dir()).resolve()) 

562 container_io_dict: dict = {"in": {}, "out": {}, "unique_dir": unique_dir} 

563 

564 # IN files COPY and assign INTERNAL PATH 

565 for file_ref, file_path in io_dict["in"].items(): 

566 if file_path: 

567 if Path(file_path).exists(): 

568 shutil.copy2(file_path, unique_dir) 

569 log(f"Copy: {file_path} to {unique_dir}") 

570 container_io_dict["in"][file_ref] = str( 

571 Path(container_volume_path).joinpath(Path(file_path).name) 

572 ) 

573 else: 

574 # Default files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro) 

575 container_io_dict["in"][file_ref] = file_path 

576 

577 # OUT files assign INTERNAL PATH 

578 for file_ref, file_path in io_dict["out"].items(): 

579 if file_path: 

580 container_io_dict["out"][file_ref] = str( 

581 Path(container_volume_path).joinpath(Path(file_path).name) 

582 ) 

583 

584 return container_io_dict 

585 

586 

587def copy_to_host(container_path: str, container_io_dict: dict, io_dict: dict): 

588 if not container_path: 

589 return 

590 

591 # OUT files COPY 

592 for file_ref, file_path in container_io_dict["out"].items(): 

593 if file_path: 

594 container_file_path = str( 

595 Path(container_io_dict["unique_dir"]).joinpath(Path(file_path).name) 

596 ) 

597 if Path(container_file_path).exists(): 

598 shutil.copy2(container_file_path, io_dict["out"][file_ref]) 

599 

600 

601def create_cmd_line( 

602 cmd: list[str], 

603 container_path: Optional[Union[str, Path]] = "", 

604 host_volume: Optional[Union[str, Path]] = None, 

605 container_volume: Optional[Union[str, Path]] = None, 

606 container_working_dir: Optional[Union[str, Path]] = None, 

607 container_user_uid: Optional[str] = None, 

608 container_shell_path: Optional[Union[str, Path]] = None, 

609 container_image: Optional[Union[str, Path]] = None, 

610 out_log: Optional[logging.Logger] = None, 

611 global_log: Optional[logging.Logger] = None 

612) -> list[str]: 

613 container_path = container_path or "" 

614 if str(container_path).endswith("singularity"): 

615 log("Using Singularity image %s" % container_image, out_log, global_log) 

616 if not Path(str(container_image)).exists(): 

617 log( 

618 f"{container_image} does not exist trying to pull it", 

619 out_log, 

620 global_log, 

621 ) 

622 container_image_name = str(Path(str(container_image)).with_suffix(".sif").name) 

623 singularity_pull_cmd = [ 

624 str(container_path), 

625 "pull", 

626 "--name", 

627 str(container_image_name), 

628 str(container_image), 

629 ] 

630 try: 

631 from biobb_common.command_wrapper import cmd_wrapper 

632 

633 cmd_wrapper.CmdWrapper(cmd=singularity_pull_cmd, out_log=out_log).launch() 

634 if Path(container_image_name).exists(): 

635 container_image = container_image_name 

636 else: 

637 raise FileNotFoundError 

638 except FileNotFoundError: 

639 log(f"{' '.join(singularity_pull_cmd)} not found", out_log, global_log) 

640 raise FileNotFoundError 

641 singularity_cmd: list[str] = [ 

642 str(container_path), 

643 "exec", 

644 "-e", 

645 "--bind", 

646 str(host_volume) + ":" + str(container_volume), 

647 str(container_image), 

648 ] 

649 # If we are working on a mac remove -e option because is still no available 

650 if platform == "darwin": 

651 if "-e" in singularity_cmd: 

652 singularity_cmd.remove("-e") 

653 

654 cmd = ['"' + " ".join(cmd) + '"'] 

655 singularity_cmd.extend([str(container_shell_path), "-c"]) 

656 return singularity_cmd + cmd 

657 

658 elif str(container_path).endswith("docker"): 

659 log("Using Docker image %s" % container_image, out_log, global_log) 

660 docker_cmd = [str(container_path), "run"] 

661 if container_working_dir: 

662 docker_cmd.append("-w") 

663 docker_cmd.append(str(container_working_dir)) 

664 if container_volume: 

665 docker_cmd.append("-v") 

666 docker_cmd.append(str(host_volume) + ":" + str(container_volume)) 

667 if container_user_uid: 

668 docker_cmd.append("--user") 

669 docker_cmd.append(container_user_uid) 

670 

671 docker_cmd.append(str(container_image)) 

672 

673 cmd = ['"' + " ".join(cmd) + '"'] 

674 docker_cmd.extend([str(container_shell_path), "-c"]) 

675 return docker_cmd + cmd 

676 

677 elif str(container_path).endswith("pcocc"): 

678 # pcocc run -I racov56:pmx cli.py mutate -h 

679 log("Using pcocc image %s" % container_image, out_log, global_log) 

680 pcocc_cmd = [str(container_path), "run", "-I", str(container_image)] 

681 if container_working_dir: 

682 pcocc_cmd.append("--cwd") 

683 pcocc_cmd.append(str(container_working_dir)) 

684 if container_volume: 

685 pcocc_cmd.append("--mount") 

686 pcocc_cmd.append(str(host_volume) + ":" + str(container_volume)) 

687 if container_user_uid: 

688 pcocc_cmd.append("--user") 

689 pcocc_cmd.append(container_user_uid) 

690 

691 cmd = ['\\"' + " ".join(cmd) + '\\"'] 

692 pcocc_cmd.extend([str(container_shell_path), "-c"]) 

693 return pcocc_cmd + cmd 

694 

695 else: 

696 # log('Not using any container', out_log, global_log) 

697 return cmd 

698 

699 

700def get_doc_dicts(doc: Optional[str]): 

701 regex_argument = re.compile( 

702 r"(?P<argument>\w*)\ *(?:\()(?P<type>\w*)(?:\)):?\ *(?P<optional>\(\w*\):)?\ *(?P<description>.*?)(?:\.)\ *(?:File type:\ *)(?P<input_output>\w+)\.\ *(\`(?:.+)\<(?P<sample_file>.*?)\>\`\_\.)?\ *(?:Accepted formats:\ *)(?P<formats>.+)(?:\.)?" 

703 ) 

704 regex_argument_formats = re.compile( 

705 r"(?P<extension>\w*)\ *(\(\ *)\ *edam\ *:\ *(?P<edam>\w*)" 

706 ) 

707 regex_property = re.compile( 

708 r"(?:\*\ *\*\*)(?P<property>.*?)(?:\*\*)\ *(?:\(\*)(?P<type>\w*)(?:\*\))\ *\-\ ?(?:\()(?P<default_value>.*?)(?:\))\ *(?:(?:\[)(?P<wf_property>WF property)(?:\]))?\ *(?:(?:\[)(?P<range_start>[\-]?\d+(?:\.\d+)?)\~(?P<range_stop>[\-]?\d+(?:\.\d+)?)(?:\|)?(?P<range_step>\d+(?:\.\d+)?)?(?:\]))?\ *(?:(?:\[)(.*?)(?:\]))?\ *(?P<description>.*)" 

709 ) 

710 regex_property_value = re.compile( 

711 r"(?P<value>\w*)\ *(?:(?:\()(?P<description>.*?)?(?:\)))?" 

712 ) 

713 

714 doc_lines = list( 

715 map(str.strip, filter(lambda line: line.strip(), str(doc).splitlines())) 

716 ) 

717 args_index = doc_lines.index( 

718 next(filter(lambda line: line.lower().startswith("args"), doc_lines)) 

719 ) 

720 properties_index = doc_lines.index( 

721 next(filter(lambda line: line.lower().startswith("properties"), doc_lines)) 

722 ) 

723 examples_index = doc_lines.index( 

724 next(filter(lambda line: line.lower().startswith("examples"), doc_lines)) 

725 ) 

726 arguments_lines_list = doc_lines[args_index + 1: properties_index] 

727 properties_lines_list = doc_lines[properties_index + 1: examples_index] 

728 

729 doc_arguments_dict = {} 

730 for argument_line in arguments_lines_list: 

731 match_argument = regex_argument.match(argument_line) 

732 argument_dict = match_argument.groupdict() if match_argument is not None else {} 

733 argument_dict["formats"] = { 

734 match.group("extension"): match.group("edam") 

735 for match in regex_argument_formats.finditer(argument_dict["formats"]) 

736 } 

737 doc_arguments_dict[argument_dict.pop("argument")] = argument_dict 

738 

739 doc_properties_dict = {} 

740 for property_line in properties_lines_list: 

741 match_property = regex_property.match(property_line) 

742 property_dict = match_property.groupdict() if match_property is not None else {} 

743 property_dict["values"] = None 

744 if "Values:" in property_dict["description"]: 

745 property_dict["description"], property_dict["values"] = property_dict[ 

746 "description" 

747 ].split("Values:") 

748 property_dict["values"] = { 

749 match.group("value"): match.group("description") 

750 for match in regex_property_value.finditer(property_dict["values"]) 

751 if match.group("value") 

752 } 

753 doc_properties_dict[property_dict.pop("property")] = property_dict 

754 

755 return doc_arguments_dict, doc_properties_dict 

756 

757 

758def check_argument( 

759 path: Optional[pathlib.Path], 

760 argument: str, 

761 optional: bool, 

762 module_name: str, 

763 input_output: Optional[str] = None, 

764 output_files_created: bool = False, 

765 type: Optional[str] = None, 

766 extension_list: Optional[list[str]] = None, 

767 raise_exception: bool = True, 

768 check_extensions: bool = True, 

769 out_log: Optional[logging.Logger] = None, 

770) -> None: 

771 if optional and not path: 

772 return None 

773 

774 if input_output in ["in", "input"]: 

775 input_file = True 

776 elif input_output in ["out", "output"]: 

777 input_file = False 

778 else: 

779 unable_to_determine_string = ( 

780 f"{module_name} {argument}: Unable to determine if input or output file." 

781 ) 

782 log(unable_to_determine_string, out_log) 

783 if raise_exception: 

784 raise FileNotFoundError( 

785 errno.ENOENT, os.strerror(errno.ENOENT), unable_to_determine_string 

786 ) 

787 warnings.warn(unable_to_determine_string) 

788 

789 if input_file or output_files_created: 

790 not_found_error_string = ( 

791 f"Path {path} --- {module_name}: Unexisting {argument} file." 

792 ) 

793 if not Path(str(path)).exists(): 

794 log(not_found_error_string, out_log) 

795 if raise_exception: 

796 raise FileNotFoundError( 

797 errno.ENOENT, os.strerror(errno.ENOENT), not_found_error_string 

798 ) 

799 warnings.warn(not_found_error_string) 

800 # else: 

801 # if not path.parent.exists(): 

802 # not_found_dir_error_string = f"Path {path.parent} --- {module_name}: Unexisting {argument} directory." 

803 # log(not_found_dir_error_string, out_log) 

804 # if raise_exception: 

805 # raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), not_found_dir_error_string) 

806 # warnings.warn(not_found_dir_error_string) 

807 

808 if check_extensions and extension_list and type != "dir": 

809 no_extension_error_string = f"{module_name} {argument}: {path} has no extension. If you want to suppress this message, please set the check_extensions property to False" 

810 if not Path(str(path)).suffix: 

811 log(no_extension_error_string) 

812 warnings.warn(no_extension_error_string) 

813 else: 

814 not_valid_extension_error_string = f"{module_name} {argument}: {path} extension is not in the valid extensions list: {extension_list}. If you want to suppress this message, please set the check_extensions property to False" 

815 if not Path(str(path)).suffix[1:].lower() in extension_list: 

816 log(not_valid_extension_error_string) 

817 warnings.warn(not_valid_extension_error_string) 

818 

819 

820@contextmanager 

821def change_dir(destination): 

822 """Context manager for changing directory.""" 

823 cwd = os.getcwd() 

824 if not Path(destination).exists(): 

825 os.makedirs(destination) 

826 try: 

827 os.chdir(destination) 

828 yield 

829 finally: 

830 os.chdir(cwd)