Coverage for biobb_common / biobb_common / tools / file_utils.py: 43%
411 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-05 15:29 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-05 15:29 +0000
1"""Tools to work with files
2"""
3import difflib
4import functools
5import logging
6import os
7import errno
8import pathlib
9import re
10import shutil
11import uuid
12import warnings
13import zipfile
14from sys import platform
15from pathlib import Path
16import typing
17from typing import Optional, Union
18import sys
19from contextlib import contextmanager
22def create_unique_file_path(parent_dir: Optional[Union[str, Path]] = None, extension: Optional[Union[str, Path]] = None) -> str:
23 if not parent_dir:
24 parent_dir = Path.cwd()
25 if not extension:
26 extension = ""
27 while True:
28 name = f"{uuid.uuid4()}{extension}"
29 file_path = Path.joinpath(Path(parent_dir).resolve(), name)
30 if not file_path.exists():
31 return str(file_path)
34def create_dir(dir_path: str) -> str:
35 """Returns the directory **dir_path** and create it if path does not exist.
37 Args:
38 dir_path (str): Path to the directory that will be created.
40 Returns:
41 str: Directory dir path.
42 """
43 if not Path(dir_path).exists():
44 Path(dir_path).mkdir(exist_ok=True, parents=True)
45 return str(Path(dir_path))
48def create_stdin_file(intput_string: str) -> str:
49 file_path = create_unique_file_path(extension=".stdin")
50 with open(file_path, "w") as file_handler:
51 file_handler.write(intput_string)
52 return file_path
55def create_unique_dir(
56 path: str = "",
57 prefix: str = "",
58 number_attempts: int = 10,
59 out_log: Optional[logging.Logger] = None,
60) -> str:
61 """Create a directory with a prefix + computed unique name. If the
62 computed name collides with an existing file name it attemps
63 **number_attempts** times to create another unique id and create
64 the directory with the new name.
66 Args:
67 path (str): ('') Parent path of the new directory.
68 prefix (str): ('') String to be added before the computed unique dir name.
69 number_attempts (int): (10) number of times creating the directory if there's a name conflict.
70 out_log (logger): (None) Python logger object.
72 Returns:
73 str: Directory dir path.
74 """
75 new_dir = prefix + str(uuid.uuid4())
76 if path:
77 new_dir = str(Path(path).joinpath(new_dir))
78 for i in range(number_attempts):
79 try:
80 oldumask = os.umask(0)
81 Path(new_dir).mkdir(mode=0o777, parents=True, exist_ok=False)
82 if out_log:
83 out_log.info("Directory successfully created: %s" % new_dir)
84 os.umask(oldumask)
85 return new_dir
86 except OSError:
87 if out_log:
88 out_log.info(new_dir + " Already exists")
89 out_log.info("Retrying %i times more" % (number_attempts - i))
90 new_dir = prefix + str(uuid.uuid4().hex)
91 if path:
92 new_dir = str(Path(path).joinpath(new_dir))
93 if out_log:
94 out_log.info("Trying with: " + new_dir)
95 raise FileExistsError
98def get_working_dir_path(working_dir_path: Optional[Union[str, Path]] = None, restart: bool = False) -> str:
99 """Return the directory **working_dir_path** and create it if working_dir_path
100 does not exist. If **working_dir_path** exists a consecutive numerical suffix
101 is added to the end of the **working_dir_path** and is returned.
103 Args:
104 working_dir_path (str): Path to the workflow results.
105 restart (bool): If step result exists do not execute the step again.
107 Returns:
108 str: Path to the workflow results directory.
109 """
110 if not working_dir_path:
111 return str(Path.cwd().resolve())
113 working_dir_path = str(Path(working_dir_path).resolve())
115 if (not Path(working_dir_path).exists()) or restart:
116 return str(Path(working_dir_path))
118 cont = 1
119 while Path(str(working_dir_path)).exists():
120 working_dir_path = (
121 re.split(r"_[0-9]+$", str(working_dir_path))[0] + "_" + str(cont)
122 )
123 cont += 1
124 return str(working_dir_path)
127def zip_list(
128 zip_file: Union[str, Path], file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None
129):
130 """Compress all files listed in **file_list** into **zip_file** zip file.
132 Args:
133 zip_file (str): Output compressed zip file.
134 file_list (:obj:`list` of :obj:`str`): Input list of files to be compressed.
135 out_log (:obj:`logging.Logger`): Input log object.
136 """
137 file_list = list(file_list)
138 file_list.sort()
139 Path(zip_file).parent.mkdir(parents=True, exist_ok=True)
140 with zipfile.ZipFile(zip_file, "w") as zip_f:
141 inserted = []
142 for index, f in enumerate(file_list):
143 base_name = Path(f).name
144 if base_name in inserted:
145 base_name = "file_" + str(index) + "_" + base_name
146 inserted.append(base_name)
147 zip_f.write(f, arcname=base_name)
148 if out_log:
149 out_log.info("Adding:")
150 # out_log.info(list(map(lambda x: str(Path(x).resolve().relative_to(Path.cwd())), file_list)))
151 out_log.info(str(file_list))
152 out_log.info("to: " + str(Path(zip_file).resolve()))
155def unzip_list(
156 zip_file: Union[str, Path], dest_dir: Optional[Union[str, Path]] = None, out_log: Optional[logging.Logger] = None
157) -> list[str]:
158 """Extract all files in the zipball file and return a list containing the
159 absolute path of the extracted files.
161 Args:
162 zip_file (str): Input compressed zip file.
163 dest_dir (str): Path to directory where the files will be extracted.
164 out_log (:obj:`logging.Logger`): Input log object.
166 Returns:
167 :obj:`list` of :obj:`str`: list of paths of the extracted files.
168 """
169 with zipfile.ZipFile(zip_file, "r") as zip_f:
170 zip_f.extractall(path=dest_dir)
171 file_list = [str(Path(str(dest_dir)).joinpath(f)) for f in zip_f.namelist()]
173 if out_log:
174 out_log.info("Extracting: " + str(Path(zip_file).resolve()))
175 out_log.info("to:")
176 out_log.info(str(file_list))
178 return file_list
181def search_topology_files(
182 top_file: Union[str, Path], out_log: Optional[logging.Logger] = None
183) -> list[str]:
184 """Search the top and itp files to create a list of the topology files
186 Args:
187 top_file (str): Topology GROMACS top file.
188 out_log (:obj:`logging.Logger`): Input log object.
190 Returns:
191 :obj:`list` of :obj:`str`: list of paths of the extracted files.
192 """
193 top_dir_name = str(Path(top_file).parent)
194 file_list = []
195 pattern = re.compile(r"#include\s+\"(.+)\"")
196 if Path(top_file).exists():
197 with open(top_file) as tf:
198 for line in tf:
199 include_file = pattern.match(line.strip())
200 if include_file:
201 found_file = str(Path(top_dir_name).joinpath(include_file.group(1)))
202 file_list += search_topology_files(found_file, out_log)
203 else:
204 if out_log:
205 out_log.info("Ignored file %s" % top_file)
206 return file_list
207 return file_list + [str(top_file)]
210def zip_top(
211 zip_file: Union[str, Path],
212 top_file: Union[str, Path],
213 out_log: Optional[logging.Logger] = None,
214 remove_original_files: bool = True,
215) -> list[str]:
216 """Compress all *.itp and *.top files in the cwd into **zip_file** zip file.
218 Args:
219 zip_file (str): Output compressed zip file.
220 top_file (str): Topology TOP GROMACS file.
221 out_log (:obj:`logging.Logger`): Input log object.
223 Returns:
224 :obj:`list` of :obj:`str`: list of compressed paths.
225 """
226 file_list = search_topology_files(top_file, out_log)
227 zip_list(zip_file, file_list, out_log)
228 # Only remove files on the same directory of the top file
229 rm_list = [f for f in file_list if Path(f).parent == Path(top_file).parent]
230 if remove_original_files:
231 rm_file_list(rm_list, out_log)
232 return file_list
235def unzip_top(
236 zip_file: Union[str, Path],
237 out_log: Optional[logging.Logger] = None,
238 unique_dir: Optional[Union[pathlib.Path, str]] = None,
239) -> str:
240 """Extract all files in the zip_file and copy the file extracted ".top" file to top_file.
242 Args:
243 zip_file (str): Input topology zipball file path.
244 out_log (:obj:`logging.Logger`): Input log object.
245 unique_dir (str): Directory where the topology will be extracted.
247 Returns:
248 str: Path to the extracted ".top" file.
250 """
251 unique_dir = unique_dir or create_unique_dir()
252 top_list = unzip_list(zip_file, unique_dir, out_log)
253 top_file = next(name for name in top_list if name.endswith(".top"))
254 if out_log:
255 out_log.info("Unzipping: ")
256 out_log.info(zip_file)
257 out_log.info("To: ")
258 for file_name in top_list:
259 out_log.info(file_name)
260 return top_file
263def get_logs_prefix():
264 return 4 * " "
267def create_incremental_name(path: Union[Path, str]) -> str:
268 """Increment the name of the file by adding a number at the end.
270 Args:
271 path (str): path of the file.
273 Returns:
274 str: Incremented name of the file.
275 """
276 if (path_obj := Path(path)).exists():
277 cont = 1
278 while path_obj.exists():
279 new_name = f'{path_obj.stem.rstrip("0123456789_")}_{cont}{path_obj.suffix}'
280 path_obj = path_obj.with_name(new_name)
281 cont += 1
282 return str(path_obj)
285def get_logs(
286 path: Optional[Union[str, Path]] = None,
287 prefix: Optional[str] = None,
288 step: Optional[str] = None,
289 can_write_console: bool = True,
290 can_write_file: bool = True,
291 out_log_path: Optional[Union[str, Path]] = None,
292 err_log_path: Optional[Union[str, Path]] = None,
293 level: str = "INFO",
294 light_format: bool = False,
295) -> tuple[logging.Logger, logging.Logger]:
296 """Get the error and and out Python Logger objects.
298 Args:
299 path (str): (current working directory) Path to the log file directory.
300 prefix (str): Prefix added to the name of the log file.
301 step (str): String added between the **prefix** arg and the name of the log file.
302 can_write_console (bool): (True) If True, show log in the execution terminal.
303 can_write_file (bool): (True) If True, write log to the log files.
304 out_log_path (str): (None) Path to the out log file.
305 err_log_path (str): (None) Path to the err log file.
306 level (str): ('INFO') Set Logging level. ['CRITICAL','ERROR','WARNING','INFO','DEBUG','NOTSET']
307 light_format (bool): (False) Minimalist log format.
309 Returns:
310 :obj:`tuple` of :obj:`logging.Logger` and :obj:`logging.Logger`: Out and err Logger objects.
311 """
312 out_log_path = out_log_path or "log.out"
313 err_log_path = err_log_path or "log.err"
314 # If paths are not absolute create and return them
315 if not Path(out_log_path).is_absolute():
316 out_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(out_log_path)))
317 if not Path(err_log_path).is_absolute():
318 err_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(err_log_path)))
319 # Create logging objects
320 out_Logger = logging.getLogger(str(out_log_path))
321 err_Logger = logging.getLogger(str(err_log_path))
323 # Create logging format
324 logFormatter = logging.Formatter(
325 "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s"
326 )
327 if light_format:
328 logFormatter = logging.Formatter("%(asctime)s %(message)s", "%H:%M:%S")
330 if can_write_file:
331 prefix = prefix if prefix else ""
332 step = step if step else ""
333 path = path if path else str(Path.cwd())
335 # Create dir if it not exists
336 create_dir(str(Path(out_log_path).resolve().parent))
338 # Create FileHandler
339 out_fileHandler = logging.FileHandler(out_log_path, mode="a", encoding=None, delay=True)
340 err_fileHandler = logging.FileHandler(err_log_path, mode="a", encoding=None, delay=True)
341 # Asign format to FileHandler
342 out_fileHandler.setFormatter(logFormatter)
343 err_fileHandler.setFormatter(logFormatter)
345 # Assign FileHandler to logging object
346 if not len(out_Logger.handlers):
347 out_Logger.addHandler(out_fileHandler)
348 err_Logger.addHandler(err_fileHandler)
350 if can_write_console:
351 console_out = logging.StreamHandler(stream=sys.stdout)
352 console_err = logging.StreamHandler(stream=sys.stderr)
353 console_out.setFormatter(logFormatter)
354 console_err.setFormatter(logFormatter)
355 # Assign consoleHandler to logging objects as aditional output
356 if len(out_Logger.handlers) < 2:
357 out_Logger.addHandler(console_out)
358 err_Logger.addHandler(console_err)
360 # Set logging level level
361 out_Logger.setLevel(level)
362 err_Logger.setLevel(level)
364 return out_Logger, err_Logger
367def launchlogger(func):
368 """Decorator to create the out_log and err_log"""
369 @functools.wraps(func)
370 def wrapper_log(*args, **kwargs):
371 create_dir(create_name(path=args[0].path))
372 if args[0].disable_logs:
373 return func(*args, **kwargs)
375 # Create local out_log and err_log
376 args[0].out_log, args[0].err_log = get_logs(
377 path=args[0].path,
378 prefix=args[0].prefix,
379 step=args[0].step,
380 can_write_console=args[0].can_write_console_log,
381 can_write_file=args[0].can_write_file_log,
382 out_log_path=args[0].out_log_path,
383 err_log_path=args[0].err_log_path
384 )
386 # Run the function and capture its return value
387 value = func(*args, **kwargs)
389 # Close and remove handlers from out_log and err_log
390 for log in [args[0].out_log, args[0].err_log]:
391 # Create a copy [:] of the handler list to be able to modify it while we are iterating
392 handlers = log.handlers[:]
393 for handler in handlers:
394 handler.close()
395 log.removeHandler(handler)
397 return value
399 return wrapper_log
402def log(string: str, local_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None):
403 """Checks if log exists
405 Args:
406 string (str): Message to log.
407 local_log (:obj:`logging.Logger`): local log object.
408 global_log (:obj:`logging.Logger`): global log object.
410 """
411 if local_log:
412 local_log.info(string)
413 if global_log:
414 global_log.info(get_logs_prefix() + string)
417def human_readable_time(time_ps: int) -> str:
418 """Transform **time_ps** to a human readable string.
420 Args:
421 time_ps (int): Time in pico seconds.
423 Returns:
424 str: Human readable time.
425 """
426 time_units = [
427 "femto seconds",
428 "pico seconds",
429 "nano seconds",
430 "micro seconds",
431 "mili seconds",
432 ]
433 t = time_ps * 1000
434 for tu in time_units:
435 if t < 1000:
436 return str(t) + " " + tu
438 t = int(t/1000)
439 return str(time_ps)
442def check_properties(obj: object, properties: dict, reserved_properties: Optional[list[str]] = None):
443 if not reserved_properties:
444 reserved_properties = []
445 error_properties = set(
446 [prop for prop in properties.keys() if prop not in obj.__dict__.keys()]
447 )
448 error_properties -= set(["system", "working_dir_path"] + list(reserved_properties))
449 for error_property in error_properties:
450 close_property_list = difflib.get_close_matches(
451 error_property, obj.__dict__.keys(), n=1, cutoff=0.01
452 )
453 close_property = close_property_list[0] if close_property_list else ""
454 warnings.warn(
455 "Warning: %s is not a recognized property. The most similar property is: %s"
456 % (error_property, close_property)
457 )
460def create_name(
461 path: Optional[Union[str, Path]] = None, prefix: Optional[str] = None,
462 step: Optional[str] = None, name: Optional[str] = None
463) -> str:
464 """Return file name.
466 Args:
467 path (str): Path to the file directory.
468 prefix (str): Prefix added to the name of the file.
469 step (str): String added between the **prefix** arg and the **name** arg of the file.
470 name (str): Name of the file.
472 Returns:
473 str: Composed file name.
474 """
475 name = "" if name is None else name.strip()
476 if step:
477 if name:
478 name = step + "_" + name
479 else:
480 name = step
481 if prefix:
482 prefix = prefix.replace("/", "_")
483 if name:
484 name = prefix + "_" + name
485 else:
486 name = prefix
487 if path:
488 if name:
489 name = str(Path(path).joinpath(name))
490 else:
491 name = str(path)
492 return name
495def write_failed_output(file_name: str):
496 with open(file_name, "w") as f:
497 f.write("Error\n")
500def rm(file_name: Union[str, Path]) -> Optional[Union[str, Path]]:
501 try:
502 file_path = pathlib.Path(file_name)
503 if file_path.exists():
504 if file_path.is_dir():
505 shutil.rmtree(file_name)
506 return file_name
507 if file_path.is_file():
508 Path(file_name).unlink()
509 return file_name
510 except Exception:
511 pass
512 return None
515def rm_file_list(
516 file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None
517) -> list[str]:
518 removed_files = [str(f) for f in file_list if rm(f)]
519 if len(removed_files) > 0 and out_log:
520 log("Removed: %s" % str(removed_files), out_log)
521 return removed_files
524def check_complete_files(output_file_list: list[Union[str, Path]]) -> bool:
525 for output_file in filter(None, output_file_list):
526 output_file = Path(str(output_file))
527 file_exists = output_file.is_file() and output_file.stat().st_size > 0
528 dir_exists = output_file.is_dir() and any(output_file.iterdir())
529 if not file_exists and not dir_exists:
530 return False
531 return True
534def copytree_new_files_only(source, destination):
535 """
536 Recursively copies files from source to destination only if they don't
537 already exist in the destination.
538 """
539 if not os.path.exists(destination):
540 os.makedirs(destination)
542 for dirpath, dirnames, filenames in os.walk(source):
543 # Create a corresponding directory in the destination
544 relative_path = os.path.relpath(dirpath, source)
545 dest_dir = os.path.join(destination, relative_path)
546 if not os.path.exists(dest_dir):
547 os.makedirs(dest_dir)
549 # Copy files that do not exist or have newer modification times
550 for filename in filenames:
551 src_file_path = os.path.join(dirpath, filename)
552 dest_file_path = os.path.join(dest_dir, filename)
554 if not os.path.exists(dest_file_path) or os.path.getmtime(src_file_path) > os.path.getmtime(dest_file_path):
555 shutil.copy2(src_file_path, dest_file_path)
558def copy_to_container(container_path: Optional[Union[str, Path]], container_volume_path: str,
559 io_dict: dict, out_log: Optional[logging.Logger] = None) -> dict:
560 if not container_path:
561 return io_dict
563 unique_dir = str(Path(create_unique_dir()).resolve())
564 container_io_dict: dict = {"in": {}, "out": {}, "unique_dir": unique_dir}
566 # IN files COPY and assign INTERNAL PATH
567 for file_ref, file_path in io_dict["in"].items():
568 if file_path:
569 if Path(file_path).exists():
570 shutil.copy2(file_path, unique_dir)
571 log(f"Copy: {file_path} to {unique_dir}")
572 container_io_dict["in"][file_ref] = str(
573 Path(container_volume_path).joinpath(Path(file_path).name)
574 )
575 else:
576 # Default files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro)
577 container_io_dict["in"][file_ref] = file_path
579 # OUT files assign INTERNAL PATH
580 for file_ref, file_path in io_dict["out"].items():
581 if file_path:
582 container_io_dict["out"][file_ref] = str(
583 Path(container_volume_path).joinpath(Path(file_path).name)
584 )
586 return container_io_dict
589def copy_to_host(container_path: str, container_io_dict: dict, io_dict: dict):
590 if not container_path:
591 return
593 # OUT files COPY
594 for file_ref, file_path in container_io_dict["out"].items():
595 if file_path:
596 container_file_path = str(
597 Path(container_io_dict["unique_dir"]).joinpath(Path(file_path).name)
598 )
599 if Path(container_file_path).exists():
600 shutil.copy2(container_file_path, io_dict["out"][file_ref])
603def create_cmd_line(
604 cmd: list[str],
605 container_path: Optional[Union[str, Path]] = "",
606 host_volume: Optional[Union[str, Path]] = None,
607 container_volume: Optional[Union[str, Path]] = None,
608 container_working_dir: Optional[Union[str, Path]] = None,
609 container_user_uid: Optional[str] = None,
610 container_shell_path: Optional[Union[str, Path]] = None,
611 container_image: Optional[Union[str, Path]] = None,
612 out_log: Optional[logging.Logger] = None,
613 global_log: Optional[logging.Logger] = None
614) -> list[str]:
615 container_path = container_path or ""
616 if str(container_path).endswith("singularity"):
617 log("Using Singularity image %s" % container_image, out_log, global_log)
618 if not Path(str(container_image)).exists():
619 log(
620 f"{container_image} does not exist trying to pull it",
621 out_log,
622 global_log,
623 )
624 container_image_name = str(Path(str(container_image)).with_suffix(".sif").name)
625 singularity_pull_cmd = [
626 str(container_path),
627 "pull",
628 "--name",
629 str(container_image_name),
630 str(container_image),
631 ]
632 try:
633 from biobb_common.command_wrapper import cmd_wrapper
635 cmd_wrapper.CmdWrapper(cmd=singularity_pull_cmd, out_log=out_log).launch()
636 if Path(container_image_name).exists():
637 container_image = container_image_name
638 else:
639 raise FileNotFoundError
640 except FileNotFoundError:
641 log(f"{' '.join(singularity_pull_cmd)} not found", out_log, global_log)
642 raise FileNotFoundError
643 singularity_cmd: list[str] = [
644 str(container_path),
645 "exec",
646 "-e",
647 "--bind",
648 str(host_volume) + ":" + str(container_volume),
649 str(container_image),
650 ]
651 # If we are working on a mac remove -e option because is still no available
652 if platform == "darwin":
653 if "-e" in singularity_cmd:
654 singularity_cmd.remove("-e")
656 cmd = ['"' + " ".join(cmd) + '"']
657 singularity_cmd.extend([str(container_shell_path), "-c"])
658 return singularity_cmd + cmd
660 elif str(container_path).endswith("docker"):
661 log("Using Docker image %s" % container_image, out_log, global_log)
662 docker_cmd = [str(container_path), "run"]
663 if container_working_dir:
664 docker_cmd.append("-w")
665 docker_cmd.append(str(container_working_dir))
666 if container_volume:
667 docker_cmd.append("-v")
668 docker_cmd.append(str(host_volume) + ":" + str(container_volume))
669 if container_user_uid:
670 docker_cmd.append("--user")
671 docker_cmd.append(container_user_uid)
673 docker_cmd.append(str(container_image))
675 cmd = ['"' + " ".join(cmd) + '"']
676 docker_cmd.extend([str(container_shell_path), "-c"])
677 return docker_cmd + cmd
679 elif str(container_path).endswith("pcocc"):
680 # pcocc run -I racov56:pmx cli.py mutate -h
681 log("Using pcocc image %s" % container_image, out_log, global_log)
682 pcocc_cmd = [str(container_path), "run", "-I", str(container_image)]
683 if container_working_dir:
684 pcocc_cmd.append("--cwd")
685 pcocc_cmd.append(str(container_working_dir))
686 if container_volume:
687 pcocc_cmd.append("--mount")
688 pcocc_cmd.append(str(host_volume) + ":" + str(container_volume))
689 if container_user_uid:
690 pcocc_cmd.append("--user")
691 pcocc_cmd.append(container_user_uid)
693 cmd = ['\\"' + " ".join(cmd) + '\\"']
694 pcocc_cmd.extend([str(container_shell_path), "-c"])
695 return pcocc_cmd + cmd
697 else:
698 # log('Not using any container', out_log, global_log)
699 return cmd
702def get_doc_dicts(doc: Optional[str]):
703 regex_argument = re.compile(
704 r"(?P<argument>\w*)\ *(?:\()(?P<type>\w*)(?:\)):?\ *(?P<optional>\(\w*\):)?\ *(?P<description>.*?)(?:\.)\ *(?:File type:\ *)(?P<input_output>\w+)\.\ *(\`(?:.+)\<(?P<sample_file>.*?)\>\`\_\.)?\ *(?:Accepted formats:\ *)(?P<formats>.+)(?:\.)?"
705 )
706 regex_argument_formats = re.compile(
707 r"(?P<extension>\w*)\ *(\(\ *)\ *edam\ *:\ *(?P<edam>\w*)"
708 )
709 regex_property = re.compile(
710 r"(?:\*\ *\*\*)(?P<property>.*?)(?:\*\*)\ *(?:\(\*)(?P<type>\w*)(?:\*\))\ *\-\ ?(?:\()(?P<default_value>.*?)(?:\))\ *(?:(?:\[)(?P<wf_property>WF property)(?:\]))?\ *(?:(?:\[)(?P<range_start>[\-]?\d+(?:\.\d+)?)\~(?P<range_stop>[\-]?\d+(?:\.\d+)?)(?:\|)?(?P<range_step>\d+(?:\.\d+)?)?(?:\]))?\ *(?:(?:\[)(.*?)(?:\]))?\ *(?P<description>.*)"
711 )
712 regex_property_value = re.compile(
713 r"(?P<value>\w*)\ *(?:(?:\()(?P<description>.*?)?(?:\)))?"
714 )
716 doc_lines = list(
717 map(str.strip, filter(lambda line: line.strip(), str(doc).splitlines()))
718 )
719 args_index = doc_lines.index(
720 next(filter(lambda line: line.lower().startswith("args"), doc_lines))
721 )
722 properties_index = doc_lines.index(
723 next(filter(lambda line: line.lower().startswith("properties"), doc_lines))
724 )
725 examples_index = doc_lines.index(
726 next(filter(lambda line: line.lower().startswith("examples"), doc_lines))
727 )
728 arguments_lines_list = doc_lines[args_index + 1: properties_index]
729 properties_lines_list = doc_lines[properties_index + 1: examples_index]
731 doc_arguments_dict = {}
732 for argument_line in arguments_lines_list:
733 match_argument = regex_argument.match(argument_line)
734 argument_dict = match_argument.groupdict() if match_argument is not None else {}
735 argument_dict["formats"] = {
736 match.group("extension"): match.group("edam")
737 for match in regex_argument_formats.finditer(argument_dict["formats"])
738 }
739 doc_arguments_dict[argument_dict.pop("argument")] = argument_dict
741 doc_properties_dict = {}
742 for property_line in properties_lines_list:
743 match_property = regex_property.match(property_line)
744 property_dict = match_property.groupdict() if match_property is not None else {}
745 property_dict["values"] = None
746 if "Values:" in property_dict["description"]:
747 property_dict["description"], property_dict["values"] = property_dict[
748 "description"
749 ].split("Values:")
750 property_dict["values"] = {
751 match.group("value"): match.group("description")
752 for match in regex_property_value.finditer(property_dict["values"])
753 if match.group("value")
754 }
755 doc_properties_dict[property_dict.pop("property")] = property_dict
757 return doc_arguments_dict, doc_properties_dict
760def check_argument(
761 path: Optional[pathlib.Path],
762 argument: str,
763 optional: bool,
764 module_name: str,
765 input_output: Optional[str] = None,
766 output_files_created: bool = False,
767 type: Optional[str] = None,
768 extension_list: Optional[list[str]] = None,
769 raise_exception: bool = True,
770 check_extensions: bool = True,
771 out_log: Optional[logging.Logger] = None,
772) -> None:
773 if optional and not path:
774 return None
776 if input_output in ["in", "input"]:
777 input_file = True
778 elif input_output in ["out", "output"]:
779 input_file = False
780 else:
781 unable_to_determine_string = (
782 f"{module_name} {argument}: Unable to determine if input or output file."
783 )
784 log(unable_to_determine_string, out_log)
785 if raise_exception:
786 raise FileNotFoundError(
787 errno.ENOENT, os.strerror(errno.ENOENT), unable_to_determine_string
788 )
789 warnings.warn(unable_to_determine_string)
791 if input_file or output_files_created:
792 not_found_error_string = (
793 f"Path {path} --- {module_name}: Unexisting {argument} file."
794 )
795 if not Path(str(path)).exists():
796 log(not_found_error_string, out_log)
797 if raise_exception:
798 raise FileNotFoundError(
799 errno.ENOENT, os.strerror(errno.ENOENT), not_found_error_string
800 )
801 warnings.warn(not_found_error_string)
802 # else:
803 # if not path.parent.exists():
804 # not_found_dir_error_string = f"Path {path.parent} --- {module_name}: Unexisting {argument} directory."
805 # log(not_found_dir_error_string, out_log)
806 # if raise_exception:
807 # raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), not_found_dir_error_string)
808 # warnings.warn(not_found_dir_error_string)
810 if check_extensions and extension_list and type != "dir":
811 no_extension_error_string = f"{module_name} {argument}: {path} has no extension. If you want to suppress this message, please set the check_extensions property to False"
812 if not Path(str(path)).suffix:
813 log(no_extension_error_string)
814 warnings.warn(no_extension_error_string)
815 else:
816 not_valid_extension_error_string = f"{module_name} {argument}: {path} extension is not in the valid extensions list: {extension_list}. If you want to suppress this message, please set the check_extensions property to False"
817 if not Path(str(path)).suffix[1:].lower() in extension_list:
818 log(not_valid_extension_error_string)
819 warnings.warn(not_valid_extension_error_string)
822@contextmanager
823def change_dir(destination):
824 """Context manager for changing directory."""
825 cwd = os.getcwd()
826 if not Path(destination).exists():
827 os.makedirs(destination)
828 try:
829 os.chdir(destination)
830 yield
831 finally:
832 os.chdir(cwd)