Coverage for biobb_common / biobb_common / tools / file_utils.py: 43%

411 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-05 15:29 +0000

1"""Tools to work with files 

2""" 

3import difflib 

4import functools 

5import logging 

6import os 

7import errno 

8import pathlib 

9import re 

10import shutil 

11import uuid 

12import warnings 

13import zipfile 

14from sys import platform 

15from pathlib import Path 

16import typing 

17from typing import Optional, Union 

18import sys 

19from contextlib import contextmanager 

20 

21 

22def create_unique_file_path(parent_dir: Optional[Union[str, Path]] = None, extension: Optional[Union[str, Path]] = None) -> str: 

23 if not parent_dir: 

24 parent_dir = Path.cwd() 

25 if not extension: 

26 extension = "" 

27 while True: 

28 name = f"{uuid.uuid4()}{extension}" 

29 file_path = Path.joinpath(Path(parent_dir).resolve(), name) 

30 if not file_path.exists(): 

31 return str(file_path) 

32 

33 

34def create_dir(dir_path: str) -> str: 

35 """Returns the directory **dir_path** and create it if path does not exist. 

36 

37 Args: 

38 dir_path (str): Path to the directory that will be created. 

39 

40 Returns: 

41 str: Directory dir path. 

42 """ 

43 if not Path(dir_path).exists(): 

44 Path(dir_path).mkdir(exist_ok=True, parents=True) 

45 return str(Path(dir_path)) 

46 

47 

48def create_stdin_file(intput_string: str) -> str: 

49 file_path = create_unique_file_path(extension=".stdin") 

50 with open(file_path, "w") as file_handler: 

51 file_handler.write(intput_string) 

52 return file_path 

53 

54 

55def create_unique_dir( 

56 path: str = "", 

57 prefix: str = "", 

58 number_attempts: int = 10, 

59 out_log: Optional[logging.Logger] = None, 

60) -> str: 

61 """Create a directory with a prefix + computed unique name. If the 

62 computed name collides with an existing file name it attemps 

63 **number_attempts** times to create another unique id and create 

64 the directory with the new name. 

65 

66 Args: 

67 path (str): ('') Parent path of the new directory. 

68 prefix (str): ('') String to be added before the computed unique dir name. 

69 number_attempts (int): (10) number of times creating the directory if there's a name conflict. 

70 out_log (logger): (None) Python logger object. 

71 

72 Returns: 

73 str: Directory dir path. 

74 """ 

75 new_dir = prefix + str(uuid.uuid4()) 

76 if path: 

77 new_dir = str(Path(path).joinpath(new_dir)) 

78 for i in range(number_attempts): 

79 try: 

80 oldumask = os.umask(0) 

81 Path(new_dir).mkdir(mode=0o777, parents=True, exist_ok=False) 

82 if out_log: 

83 out_log.info("Directory successfully created: %s" % new_dir) 

84 os.umask(oldumask) 

85 return new_dir 

86 except OSError: 

87 if out_log: 

88 out_log.info(new_dir + " Already exists") 

89 out_log.info("Retrying %i times more" % (number_attempts - i)) 

90 new_dir = prefix + str(uuid.uuid4().hex) 

91 if path: 

92 new_dir = str(Path(path).joinpath(new_dir)) 

93 if out_log: 

94 out_log.info("Trying with: " + new_dir) 

95 raise FileExistsError 

96 

97 

98def get_working_dir_path(working_dir_path: Optional[Union[str, Path]] = None, restart: bool = False) -> str: 

99 """Return the directory **working_dir_path** and create it if working_dir_path 

100 does not exist. If **working_dir_path** exists a consecutive numerical suffix 

101 is added to the end of the **working_dir_path** and is returned. 

102 

103 Args: 

104 working_dir_path (str): Path to the workflow results. 

105 restart (bool): If step result exists do not execute the step again. 

106 

107 Returns: 

108 str: Path to the workflow results directory. 

109 """ 

110 if not working_dir_path: 

111 return str(Path.cwd().resolve()) 

112 

113 working_dir_path = str(Path(working_dir_path).resolve()) 

114 

115 if (not Path(working_dir_path).exists()) or restart: 

116 return str(Path(working_dir_path)) 

117 

118 cont = 1 

119 while Path(str(working_dir_path)).exists(): 

120 working_dir_path = ( 

121 re.split(r"_[0-9]+$", str(working_dir_path))[0] + "_" + str(cont) 

122 ) 

123 cont += 1 

124 return str(working_dir_path) 

125 

126 

127def zip_list( 

128 zip_file: Union[str, Path], file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None 

129): 

130 """Compress all files listed in **file_list** into **zip_file** zip file. 

131 

132 Args: 

133 zip_file (str): Output compressed zip file. 

134 file_list (:obj:`list` of :obj:`str`): Input list of files to be compressed. 

135 out_log (:obj:`logging.Logger`): Input log object. 

136 """ 

137 file_list = list(file_list) 

138 file_list.sort() 

139 Path(zip_file).parent.mkdir(parents=True, exist_ok=True) 

140 with zipfile.ZipFile(zip_file, "w") as zip_f: 

141 inserted = [] 

142 for index, f in enumerate(file_list): 

143 base_name = Path(f).name 

144 if base_name in inserted: 

145 base_name = "file_" + str(index) + "_" + base_name 

146 inserted.append(base_name) 

147 zip_f.write(f, arcname=base_name) 

148 if out_log: 

149 out_log.info("Adding:") 

150 # out_log.info(list(map(lambda x: str(Path(x).resolve().relative_to(Path.cwd())), file_list))) 

151 out_log.info(str(file_list)) 

152 out_log.info("to: " + str(Path(zip_file).resolve())) 

153 

154 

155def unzip_list( 

156 zip_file: Union[str, Path], dest_dir: Optional[Union[str, Path]] = None, out_log: Optional[logging.Logger] = None 

157) -> list[str]: 

158 """Extract all files in the zipball file and return a list containing the 

159 absolute path of the extracted files. 

160 

161 Args: 

162 zip_file (str): Input compressed zip file. 

163 dest_dir (str): Path to directory where the files will be extracted. 

164 out_log (:obj:`logging.Logger`): Input log object. 

165 

166 Returns: 

167 :obj:`list` of :obj:`str`: list of paths of the extracted files. 

168 """ 

169 with zipfile.ZipFile(zip_file, "r") as zip_f: 

170 zip_f.extractall(path=dest_dir) 

171 file_list = [str(Path(str(dest_dir)).joinpath(f)) for f in zip_f.namelist()] 

172 

173 if out_log: 

174 out_log.info("Extracting: " + str(Path(zip_file).resolve())) 

175 out_log.info("to:") 

176 out_log.info(str(file_list)) 

177 

178 return file_list 

179 

180 

181def search_topology_files( 

182 top_file: Union[str, Path], out_log: Optional[logging.Logger] = None 

183) -> list[str]: 

184 """Search the top and itp files to create a list of the topology files 

185 

186 Args: 

187 top_file (str): Topology GROMACS top file. 

188 out_log (:obj:`logging.Logger`): Input log object. 

189 

190 Returns: 

191 :obj:`list` of :obj:`str`: list of paths of the extracted files. 

192 """ 

193 top_dir_name = str(Path(top_file).parent) 

194 file_list = [] 

195 pattern = re.compile(r"#include\s+\"(.+)\"") 

196 if Path(top_file).exists(): 

197 with open(top_file) as tf: 

198 for line in tf: 

199 include_file = pattern.match(line.strip()) 

200 if include_file: 

201 found_file = str(Path(top_dir_name).joinpath(include_file.group(1))) 

202 file_list += search_topology_files(found_file, out_log) 

203 else: 

204 if out_log: 

205 out_log.info("Ignored file %s" % top_file) 

206 return file_list 

207 return file_list + [str(top_file)] 

208 

209 

210def zip_top( 

211 zip_file: Union[str, Path], 

212 top_file: Union[str, Path], 

213 out_log: Optional[logging.Logger] = None, 

214 remove_original_files: bool = True, 

215) -> list[str]: 

216 """Compress all *.itp and *.top files in the cwd into **zip_file** zip file. 

217 

218 Args: 

219 zip_file (str): Output compressed zip file. 

220 top_file (str): Topology TOP GROMACS file. 

221 out_log (:obj:`logging.Logger`): Input log object. 

222 

223 Returns: 

224 :obj:`list` of :obj:`str`: list of compressed paths. 

225 """ 

226 file_list = search_topology_files(top_file, out_log) 

227 zip_list(zip_file, file_list, out_log) 

228 # Only remove files on the same directory of the top file 

229 rm_list = [f for f in file_list if Path(f).parent == Path(top_file).parent] 

230 if remove_original_files: 

231 rm_file_list(rm_list, out_log) 

232 return file_list 

233 

234 

235def unzip_top( 

236 zip_file: Union[str, Path], 

237 out_log: Optional[logging.Logger] = None, 

238 unique_dir: Optional[Union[pathlib.Path, str]] = None, 

239) -> str: 

240 """Extract all files in the zip_file and copy the file extracted ".top" file to top_file. 

241 

242 Args: 

243 zip_file (str): Input topology zipball file path. 

244 out_log (:obj:`logging.Logger`): Input log object. 

245 unique_dir (str): Directory where the topology will be extracted. 

246 

247 Returns: 

248 str: Path to the extracted ".top" file. 

249 

250 """ 

251 unique_dir = unique_dir or create_unique_dir() 

252 top_list = unzip_list(zip_file, unique_dir, out_log) 

253 top_file = next(name for name in top_list if name.endswith(".top")) 

254 if out_log: 

255 out_log.info("Unzipping: ") 

256 out_log.info(zip_file) 

257 out_log.info("To: ") 

258 for file_name in top_list: 

259 out_log.info(file_name) 

260 return top_file 

261 

262 

263def get_logs_prefix(): 

264 return 4 * " " 

265 

266 

267def create_incremental_name(path: Union[Path, str]) -> str: 

268 """Increment the name of the file by adding a number at the end. 

269 

270 Args: 

271 path (str): path of the file. 

272 

273 Returns: 

274 str: Incremented name of the file. 

275 """ 

276 if (path_obj := Path(path)).exists(): 

277 cont = 1 

278 while path_obj.exists(): 

279 new_name = f'{path_obj.stem.rstrip("0123456789_")}_{cont}{path_obj.suffix}' 

280 path_obj = path_obj.with_name(new_name) 

281 cont += 1 

282 return str(path_obj) 

283 

284 

285def get_logs( 

286 path: Optional[Union[str, Path]] = None, 

287 prefix: Optional[str] = None, 

288 step: Optional[str] = None, 

289 can_write_console: bool = True, 

290 can_write_file: bool = True, 

291 out_log_path: Optional[Union[str, Path]] = None, 

292 err_log_path: Optional[Union[str, Path]] = None, 

293 level: str = "INFO", 

294 light_format: bool = False, 

295) -> tuple[logging.Logger, logging.Logger]: 

296 """Get the error and and out Python Logger objects. 

297 

298 Args: 

299 path (str): (current working directory) Path to the log file directory. 

300 prefix (str): Prefix added to the name of the log file. 

301 step (str): String added between the **prefix** arg and the name of the log file. 

302 can_write_console (bool): (True) If True, show log in the execution terminal. 

303 can_write_file (bool): (True) If True, write log to the log files. 

304 out_log_path (str): (None) Path to the out log file. 

305 err_log_path (str): (None) Path to the err log file. 

306 level (str): ('INFO') Set Logging level. ['CRITICAL','ERROR','WARNING','INFO','DEBUG','NOTSET'] 

307 light_format (bool): (False) Minimalist log format. 

308 

309 Returns: 

310 :obj:`tuple` of :obj:`logging.Logger` and :obj:`logging.Logger`: Out and err Logger objects. 

311 """ 

312 out_log_path = out_log_path or "log.out" 

313 err_log_path = err_log_path or "log.err" 

314 # If paths are not absolute create and return them 

315 if not Path(out_log_path).is_absolute(): 

316 out_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(out_log_path))) 

317 if not Path(err_log_path).is_absolute(): 

318 err_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(err_log_path))) 

319 # Create logging objects 

320 out_Logger = logging.getLogger(str(out_log_path)) 

321 err_Logger = logging.getLogger(str(err_log_path)) 

322 

323 # Create logging format 

324 logFormatter = logging.Formatter( 

325 "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s" 

326 ) 

327 if light_format: 

328 logFormatter = logging.Formatter("%(asctime)s %(message)s", "%H:%M:%S") 

329 

330 if can_write_file: 

331 prefix = prefix if prefix else "" 

332 step = step if step else "" 

333 path = path if path else str(Path.cwd()) 

334 

335 # Create dir if it not exists 

336 create_dir(str(Path(out_log_path).resolve().parent)) 

337 

338 # Create FileHandler 

339 out_fileHandler = logging.FileHandler(out_log_path, mode="a", encoding=None, delay=True) 

340 err_fileHandler = logging.FileHandler(err_log_path, mode="a", encoding=None, delay=True) 

341 # Asign format to FileHandler 

342 out_fileHandler.setFormatter(logFormatter) 

343 err_fileHandler.setFormatter(logFormatter) 

344 

345 # Assign FileHandler to logging object 

346 if not len(out_Logger.handlers): 

347 out_Logger.addHandler(out_fileHandler) 

348 err_Logger.addHandler(err_fileHandler) 

349 

350 if can_write_console: 

351 console_out = logging.StreamHandler(stream=sys.stdout) 

352 console_err = logging.StreamHandler(stream=sys.stderr) 

353 console_out.setFormatter(logFormatter) 

354 console_err.setFormatter(logFormatter) 

355 # Assign consoleHandler to logging objects as aditional output 

356 if len(out_Logger.handlers) < 2: 

357 out_Logger.addHandler(console_out) 

358 err_Logger.addHandler(console_err) 

359 

360 # Set logging level level 

361 out_Logger.setLevel(level) 

362 err_Logger.setLevel(level) 

363 

364 return out_Logger, err_Logger 

365 

366 

367def launchlogger(func): 

368 """Decorator to create the out_log and err_log""" 

369 @functools.wraps(func) 

370 def wrapper_log(*args, **kwargs): 

371 create_dir(create_name(path=args[0].path)) 

372 if args[0].disable_logs: 

373 return func(*args, **kwargs) 

374 

375 # Create local out_log and err_log 

376 args[0].out_log, args[0].err_log = get_logs( 

377 path=args[0].path, 

378 prefix=args[0].prefix, 

379 step=args[0].step, 

380 can_write_console=args[0].can_write_console_log, 

381 can_write_file=args[0].can_write_file_log, 

382 out_log_path=args[0].out_log_path, 

383 err_log_path=args[0].err_log_path 

384 ) 

385 

386 # Run the function and capture its return value 

387 value = func(*args, **kwargs) 

388 

389 # Close and remove handlers from out_log and err_log 

390 for log in [args[0].out_log, args[0].err_log]: 

391 # Create a copy [:] of the handler list to be able to modify it while we are iterating 

392 handlers = log.handlers[:] 

393 for handler in handlers: 

394 handler.close() 

395 log.removeHandler(handler) 

396 

397 return value 

398 

399 return wrapper_log 

400 

401 

402def log(string: str, local_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None): 

403 """Checks if log exists 

404 

405 Args: 

406 string (str): Message to log. 

407 local_log (:obj:`logging.Logger`): local log object. 

408 global_log (:obj:`logging.Logger`): global log object. 

409 

410 """ 

411 if local_log: 

412 local_log.info(string) 

413 if global_log: 

414 global_log.info(get_logs_prefix() + string) 

415 

416 

417def human_readable_time(time_ps: int) -> str: 

418 """Transform **time_ps** to a human readable string. 

419 

420 Args: 

421 time_ps (int): Time in pico seconds. 

422 

423 Returns: 

424 str: Human readable time. 

425 """ 

426 time_units = [ 

427 "femto seconds", 

428 "pico seconds", 

429 "nano seconds", 

430 "micro seconds", 

431 "mili seconds", 

432 ] 

433 t = time_ps * 1000 

434 for tu in time_units: 

435 if t < 1000: 

436 return str(t) + " " + tu 

437 

438 t = int(t/1000) 

439 return str(time_ps) 

440 

441 

442def check_properties(obj: object, properties: dict, reserved_properties: Optional[list[str]] = None): 

443 if not reserved_properties: 

444 reserved_properties = [] 

445 error_properties = set( 

446 [prop for prop in properties.keys() if prop not in obj.__dict__.keys()] 

447 ) 

448 error_properties -= set(["system", "working_dir_path"] + list(reserved_properties)) 

449 for error_property in error_properties: 

450 close_property_list = difflib.get_close_matches( 

451 error_property, obj.__dict__.keys(), n=1, cutoff=0.01 

452 ) 

453 close_property = close_property_list[0] if close_property_list else "" 

454 warnings.warn( 

455 "Warning: %s is not a recognized property. The most similar property is: %s" 

456 % (error_property, close_property) 

457 ) 

458 

459 

460def create_name( 

461 path: Optional[Union[str, Path]] = None, prefix: Optional[str] = None, 

462 step: Optional[str] = None, name: Optional[str] = None 

463) -> str: 

464 """Return file name. 

465 

466 Args: 

467 path (str): Path to the file directory. 

468 prefix (str): Prefix added to the name of the file. 

469 step (str): String added between the **prefix** arg and the **name** arg of the file. 

470 name (str): Name of the file. 

471 

472 Returns: 

473 str: Composed file name. 

474 """ 

475 name = "" if name is None else name.strip() 

476 if step: 

477 if name: 

478 name = step + "_" + name 

479 else: 

480 name = step 

481 if prefix: 

482 prefix = prefix.replace("/", "_") 

483 if name: 

484 name = prefix + "_" + name 

485 else: 

486 name = prefix 

487 if path: 

488 if name: 

489 name = str(Path(path).joinpath(name)) 

490 else: 

491 name = str(path) 

492 return name 

493 

494 

495def write_failed_output(file_name: str): 

496 with open(file_name, "w") as f: 

497 f.write("Error\n") 

498 

499 

500def rm(file_name: Union[str, Path]) -> Optional[Union[str, Path]]: 

501 try: 

502 file_path = pathlib.Path(file_name) 

503 if file_path.exists(): 

504 if file_path.is_dir(): 

505 shutil.rmtree(file_name) 

506 return file_name 

507 if file_path.is_file(): 

508 Path(file_name).unlink() 

509 return file_name 

510 except Exception: 

511 pass 

512 return None 

513 

514 

515def rm_file_list( 

516 file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None 

517) -> list[str]: 

518 removed_files = [str(f) for f in file_list if rm(f)] 

519 if len(removed_files) > 0 and out_log: 

520 log("Removed: %s" % str(removed_files), out_log) 

521 return removed_files 

522 

523 

524def check_complete_files(output_file_list: list[Union[str, Path]]) -> bool: 

525 for output_file in filter(None, output_file_list): 

526 output_file = Path(str(output_file)) 

527 file_exists = output_file.is_file() and output_file.stat().st_size > 0 

528 dir_exists = output_file.is_dir() and any(output_file.iterdir()) 

529 if not file_exists and not dir_exists: 

530 return False 

531 return True 

532 

533 

534def copytree_new_files_only(source, destination): 

535 """ 

536 Recursively copies files from source to destination only if they don't 

537 already exist in the destination. 

538 """ 

539 if not os.path.exists(destination): 

540 os.makedirs(destination) 

541 

542 for dirpath, dirnames, filenames in os.walk(source): 

543 # Create a corresponding directory in the destination 

544 relative_path = os.path.relpath(dirpath, source) 

545 dest_dir = os.path.join(destination, relative_path) 

546 if not os.path.exists(dest_dir): 

547 os.makedirs(dest_dir) 

548 

549 # Copy files that do not exist or have newer modification times 

550 for filename in filenames: 

551 src_file_path = os.path.join(dirpath, filename) 

552 dest_file_path = os.path.join(dest_dir, filename) 

553 

554 if not os.path.exists(dest_file_path) or os.path.getmtime(src_file_path) > os.path.getmtime(dest_file_path): 

555 shutil.copy2(src_file_path, dest_file_path) 

556 

557 

558def copy_to_container(container_path: Optional[Union[str, Path]], container_volume_path: str, 

559 io_dict: dict, out_log: Optional[logging.Logger] = None) -> dict: 

560 if not container_path: 

561 return io_dict 

562 

563 unique_dir = str(Path(create_unique_dir()).resolve()) 

564 container_io_dict: dict = {"in": {}, "out": {}, "unique_dir": unique_dir} 

565 

566 # IN files COPY and assign INTERNAL PATH 

567 for file_ref, file_path in io_dict["in"].items(): 

568 if file_path: 

569 if Path(file_path).exists(): 

570 shutil.copy2(file_path, unique_dir) 

571 log(f"Copy: {file_path} to {unique_dir}") 

572 container_io_dict["in"][file_ref] = str( 

573 Path(container_volume_path).joinpath(Path(file_path).name) 

574 ) 

575 else: 

576 # Default files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro) 

577 container_io_dict["in"][file_ref] = file_path 

578 

579 # OUT files assign INTERNAL PATH 

580 for file_ref, file_path in io_dict["out"].items(): 

581 if file_path: 

582 container_io_dict["out"][file_ref] = str( 

583 Path(container_volume_path).joinpath(Path(file_path).name) 

584 ) 

585 

586 return container_io_dict 

587 

588 

589def copy_to_host(container_path: str, container_io_dict: dict, io_dict: dict): 

590 if not container_path: 

591 return 

592 

593 # OUT files COPY 

594 for file_ref, file_path in container_io_dict["out"].items(): 

595 if file_path: 

596 container_file_path = str( 

597 Path(container_io_dict["unique_dir"]).joinpath(Path(file_path).name) 

598 ) 

599 if Path(container_file_path).exists(): 

600 shutil.copy2(container_file_path, io_dict["out"][file_ref]) 

601 

602 

603def create_cmd_line( 

604 cmd: list[str], 

605 container_path: Optional[Union[str, Path]] = "", 

606 host_volume: Optional[Union[str, Path]] = None, 

607 container_volume: Optional[Union[str, Path]] = None, 

608 container_working_dir: Optional[Union[str, Path]] = None, 

609 container_user_uid: Optional[str] = None, 

610 container_shell_path: Optional[Union[str, Path]] = None, 

611 container_image: Optional[Union[str, Path]] = None, 

612 out_log: Optional[logging.Logger] = None, 

613 global_log: Optional[logging.Logger] = None 

614) -> list[str]: 

615 container_path = container_path or "" 

616 if str(container_path).endswith("singularity"): 

617 log("Using Singularity image %s" % container_image, out_log, global_log) 

618 if not Path(str(container_image)).exists(): 

619 log( 

620 f"{container_image} does not exist trying to pull it", 

621 out_log, 

622 global_log, 

623 ) 

624 container_image_name = str(Path(str(container_image)).with_suffix(".sif").name) 

625 singularity_pull_cmd = [ 

626 str(container_path), 

627 "pull", 

628 "--name", 

629 str(container_image_name), 

630 str(container_image), 

631 ] 

632 try: 

633 from biobb_common.command_wrapper import cmd_wrapper 

634 

635 cmd_wrapper.CmdWrapper(cmd=singularity_pull_cmd, out_log=out_log).launch() 

636 if Path(container_image_name).exists(): 

637 container_image = container_image_name 

638 else: 

639 raise FileNotFoundError 

640 except FileNotFoundError: 

641 log(f"{' '.join(singularity_pull_cmd)} not found", out_log, global_log) 

642 raise FileNotFoundError 

643 singularity_cmd: list[str] = [ 

644 str(container_path), 

645 "exec", 

646 "-e", 

647 "--bind", 

648 str(host_volume) + ":" + str(container_volume), 

649 str(container_image), 

650 ] 

651 # If we are working on a mac remove -e option because is still no available 

652 if platform == "darwin": 

653 if "-e" in singularity_cmd: 

654 singularity_cmd.remove("-e") 

655 

656 cmd = ['"' + " ".join(cmd) + '"'] 

657 singularity_cmd.extend([str(container_shell_path), "-c"]) 

658 return singularity_cmd + cmd 

659 

660 elif str(container_path).endswith("docker"): 

661 log("Using Docker image %s" % container_image, out_log, global_log) 

662 docker_cmd = [str(container_path), "run"] 

663 if container_working_dir: 

664 docker_cmd.append("-w") 

665 docker_cmd.append(str(container_working_dir)) 

666 if container_volume: 

667 docker_cmd.append("-v") 

668 docker_cmd.append(str(host_volume) + ":" + str(container_volume)) 

669 if container_user_uid: 

670 docker_cmd.append("--user") 

671 docker_cmd.append(container_user_uid) 

672 

673 docker_cmd.append(str(container_image)) 

674 

675 cmd = ['"' + " ".join(cmd) + '"'] 

676 docker_cmd.extend([str(container_shell_path), "-c"]) 

677 return docker_cmd + cmd 

678 

679 elif str(container_path).endswith("pcocc"): 

680 # pcocc run -I racov56:pmx cli.py mutate -h 

681 log("Using pcocc image %s" % container_image, out_log, global_log) 

682 pcocc_cmd = [str(container_path), "run", "-I", str(container_image)] 

683 if container_working_dir: 

684 pcocc_cmd.append("--cwd") 

685 pcocc_cmd.append(str(container_working_dir)) 

686 if container_volume: 

687 pcocc_cmd.append("--mount") 

688 pcocc_cmd.append(str(host_volume) + ":" + str(container_volume)) 

689 if container_user_uid: 

690 pcocc_cmd.append("--user") 

691 pcocc_cmd.append(container_user_uid) 

692 

693 cmd = ['\\"' + " ".join(cmd) + '\\"'] 

694 pcocc_cmd.extend([str(container_shell_path), "-c"]) 

695 return pcocc_cmd + cmd 

696 

697 else: 

698 # log('Not using any container', out_log, global_log) 

699 return cmd 

700 

701 

702def get_doc_dicts(doc: Optional[str]): 

703 regex_argument = re.compile( 

704 r"(?P<argument>\w*)\ *(?:\()(?P<type>\w*)(?:\)):?\ *(?P<optional>\(\w*\):)?\ *(?P<description>.*?)(?:\.)\ *(?:File type:\ *)(?P<input_output>\w+)\.\ *(\`(?:.+)\<(?P<sample_file>.*?)\>\`\_\.)?\ *(?:Accepted formats:\ *)(?P<formats>.+)(?:\.)?" 

705 ) 

706 regex_argument_formats = re.compile( 

707 r"(?P<extension>\w*)\ *(\(\ *)\ *edam\ *:\ *(?P<edam>\w*)" 

708 ) 

709 regex_property = re.compile( 

710 r"(?:\*\ *\*\*)(?P<property>.*?)(?:\*\*)\ *(?:\(\*)(?P<type>\w*)(?:\*\))\ *\-\ ?(?:\()(?P<default_value>.*?)(?:\))\ *(?:(?:\[)(?P<wf_property>WF property)(?:\]))?\ *(?:(?:\[)(?P<range_start>[\-]?\d+(?:\.\d+)?)\~(?P<range_stop>[\-]?\d+(?:\.\d+)?)(?:\|)?(?P<range_step>\d+(?:\.\d+)?)?(?:\]))?\ *(?:(?:\[)(.*?)(?:\]))?\ *(?P<description>.*)" 

711 ) 

712 regex_property_value = re.compile( 

713 r"(?P<value>\w*)\ *(?:(?:\()(?P<description>.*?)?(?:\)))?" 

714 ) 

715 

716 doc_lines = list( 

717 map(str.strip, filter(lambda line: line.strip(), str(doc).splitlines())) 

718 ) 

719 args_index = doc_lines.index( 

720 next(filter(lambda line: line.lower().startswith("args"), doc_lines)) 

721 ) 

722 properties_index = doc_lines.index( 

723 next(filter(lambda line: line.lower().startswith("properties"), doc_lines)) 

724 ) 

725 examples_index = doc_lines.index( 

726 next(filter(lambda line: line.lower().startswith("examples"), doc_lines)) 

727 ) 

728 arguments_lines_list = doc_lines[args_index + 1: properties_index] 

729 properties_lines_list = doc_lines[properties_index + 1: examples_index] 

730 

731 doc_arguments_dict = {} 

732 for argument_line in arguments_lines_list: 

733 match_argument = regex_argument.match(argument_line) 

734 argument_dict = match_argument.groupdict() if match_argument is not None else {} 

735 argument_dict["formats"] = { 

736 match.group("extension"): match.group("edam") 

737 for match in regex_argument_formats.finditer(argument_dict["formats"]) 

738 } 

739 doc_arguments_dict[argument_dict.pop("argument")] = argument_dict 

740 

741 doc_properties_dict = {} 

742 for property_line in properties_lines_list: 

743 match_property = regex_property.match(property_line) 

744 property_dict = match_property.groupdict() if match_property is not None else {} 

745 property_dict["values"] = None 

746 if "Values:" in property_dict["description"]: 

747 property_dict["description"], property_dict["values"] = property_dict[ 

748 "description" 

749 ].split("Values:") 

750 property_dict["values"] = { 

751 match.group("value"): match.group("description") 

752 for match in regex_property_value.finditer(property_dict["values"]) 

753 if match.group("value") 

754 } 

755 doc_properties_dict[property_dict.pop("property")] = property_dict 

756 

757 return doc_arguments_dict, doc_properties_dict 

758 

759 

760def check_argument( 

761 path: Optional[pathlib.Path], 

762 argument: str, 

763 optional: bool, 

764 module_name: str, 

765 input_output: Optional[str] = None, 

766 output_files_created: bool = False, 

767 type: Optional[str] = None, 

768 extension_list: Optional[list[str]] = None, 

769 raise_exception: bool = True, 

770 check_extensions: bool = True, 

771 out_log: Optional[logging.Logger] = None, 

772) -> None: 

773 if optional and not path: 

774 return None 

775 

776 if input_output in ["in", "input"]: 

777 input_file = True 

778 elif input_output in ["out", "output"]: 

779 input_file = False 

780 else: 

781 unable_to_determine_string = ( 

782 f"{module_name} {argument}: Unable to determine if input or output file." 

783 ) 

784 log(unable_to_determine_string, out_log) 

785 if raise_exception: 

786 raise FileNotFoundError( 

787 errno.ENOENT, os.strerror(errno.ENOENT), unable_to_determine_string 

788 ) 

789 warnings.warn(unable_to_determine_string) 

790 

791 if input_file or output_files_created: 

792 not_found_error_string = ( 

793 f"Path {path} --- {module_name}: Unexisting {argument} file." 

794 ) 

795 if not Path(str(path)).exists(): 

796 log(not_found_error_string, out_log) 

797 if raise_exception: 

798 raise FileNotFoundError( 

799 errno.ENOENT, os.strerror(errno.ENOENT), not_found_error_string 

800 ) 

801 warnings.warn(not_found_error_string) 

802 # else: 

803 # if not path.parent.exists(): 

804 # not_found_dir_error_string = f"Path {path.parent} --- {module_name}: Unexisting {argument} directory." 

805 # log(not_found_dir_error_string, out_log) 

806 # if raise_exception: 

807 # raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), not_found_dir_error_string) 

808 # warnings.warn(not_found_dir_error_string) 

809 

810 if check_extensions and extension_list and type != "dir": 

811 no_extension_error_string = f"{module_name} {argument}: {path} has no extension. If you want to suppress this message, please set the check_extensions property to False" 

812 if not Path(str(path)).suffix: 

813 log(no_extension_error_string) 

814 warnings.warn(no_extension_error_string) 

815 else: 

816 not_valid_extension_error_string = f"{module_name} {argument}: {path} extension is not in the valid extensions list: {extension_list}. If you want to suppress this message, please set the check_extensions property to False" 

817 if not Path(str(path)).suffix[1:].lower() in extension_list: 

818 log(not_valid_extension_error_string) 

819 warnings.warn(not_valid_extension_error_string) 

820 

821 

822@contextmanager 

823def change_dir(destination): 

824 """Context manager for changing directory.""" 

825 cwd = os.getcwd() 

826 if not Path(destination).exists(): 

827 os.makedirs(destination) 

828 try: 

829 os.chdir(destination) 

830 yield 

831 finally: 

832 os.chdir(cwd)