Coverage for biobb_common / biobb_common / tools / file_utils.py: 43%
410 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 13:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 13:18 +0000
1"""Tools to work with files
2"""
3import difflib
4import functools
5import logging
6import os
7import errno
8import pathlib
9import re
10import shutil
11import uuid
12import warnings
13import zipfile
14from sys import platform
15from pathlib import Path
16import typing
17from typing import Optional, Union
18import sys
19from contextlib import contextmanager
22def create_unique_file_path(parent_dir: Optional[Union[str, Path]] = None, extension: Optional[Union[str, Path]] = None) -> str:
23 if not parent_dir:
24 parent_dir = Path.cwd()
25 if not extension:
26 extension = ""
27 while True:
28 name = f"{uuid.uuid4()}{extension}"
29 file_path = Path.joinpath(Path(parent_dir).resolve(), name)
30 if not file_path.exists():
31 return str(file_path)
34def create_dir(dir_path: str) -> str:
35 """Returns the directory **dir_path** and create it if path does not exist.
37 Args:
38 dir_path (str): Path to the directory that will be created.
40 Returns:
41 str: Directory dir path.
42 """
43 if not Path(dir_path).exists():
44 Path(dir_path).mkdir(exist_ok=True, parents=True)
45 return str(Path(dir_path))
48def create_stdin_file(intput_string: str) -> str:
49 file_path = create_unique_file_path(extension=".stdin")
50 with open(file_path, "w") as file_handler:
51 file_handler.write(intput_string)
52 return file_path
55def create_unique_dir(
56 path: str = "",
57 prefix: str = "",
58 number_attempts: int = 10,
59 out_log: Optional[logging.Logger] = None,
60) -> str:
61 """Create a directory with a prefix + computed unique name. If the
62 computed name collides with an existing file name it attemps
63 **number_attempts** times to create another unique id and create
64 the directory with the new name.
66 Args:
67 path (str): ('') Parent path of the new directory.
68 prefix (str): ('') String to be added before the computed unique dir name.
69 number_attempts (int): (10) number of times creating the directory if there's a name conflict.
70 out_log (logger): (None) Python logger object.
72 Returns:
73 str: Directory dir path.
74 """
75 new_dir = prefix + str(uuid.uuid4())
76 if path:
77 new_dir = str(Path(path).joinpath(new_dir))
78 for i in range(number_attempts):
79 try:
80 oldumask = os.umask(0)
81 Path(new_dir).mkdir(mode=0o777, parents=True, exist_ok=False)
82 if out_log:
83 out_log.info("Directory successfully created: %s" % new_dir)
84 os.umask(oldumask)
85 return new_dir
86 except OSError:
87 if out_log:
88 out_log.info(new_dir + " Already exists")
89 out_log.info("Retrying %i times more" % (number_attempts - i))
90 new_dir = prefix + str(uuid.uuid4().hex)
91 if path:
92 new_dir = str(Path(path).joinpath(new_dir))
93 if out_log:
94 out_log.info("Trying with: " + new_dir)
95 raise FileExistsError
98def get_working_dir_path(working_dir_path: Optional[Union[str, Path]] = None, restart: bool = False) -> str:
99 """Return the directory **working_dir_path** and create it if working_dir_path
100 does not exist. If **working_dir_path** exists a consecutive numerical suffix
101 is added to the end of the **working_dir_path** and is returned.
103 Args:
104 working_dir_path (str): Path to the workflow results.
105 restart (bool): If step result exists do not execute the step again.
107 Returns:
108 str: Path to the workflow results directory.
109 """
110 if not working_dir_path:
111 return str(Path.cwd().resolve())
113 working_dir_path = str(Path(working_dir_path).resolve())
115 if (not Path(working_dir_path).exists()) or restart:
116 return str(Path(working_dir_path))
118 cont = 1
119 while Path(str(working_dir_path)).exists():
120 working_dir_path = (
121 re.split(r"_[0-9]+$", str(working_dir_path))[0] + "_" + str(cont)
122 )
123 cont += 1
124 return str(working_dir_path)
127def zip_list(
128 zip_file: Union[str, Path], file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None
129):
130 """Compress all files listed in **file_list** into **zip_file** zip file.
132 Args:
133 zip_file (str): Output compressed zip file.
134 file_list (:obj:`list` of :obj:`str`): Input list of files to be compressed.
135 out_log (:obj:`logging.Logger`): Input log object.
136 """
137 file_list = list(file_list)
138 file_list.sort()
139 Path(zip_file).parent.mkdir(parents=True, exist_ok=True)
140 with zipfile.ZipFile(zip_file, "w") as zip_f:
141 inserted = []
142 for index, f in enumerate(file_list):
143 base_name = Path(f).name
144 if base_name in inserted:
145 base_name = "file_" + str(index) + "_" + base_name
146 inserted.append(base_name)
147 zip_f.write(f, arcname=base_name)
148 if out_log:
149 out_log.info("Adding:")
150 out_log.info(list(map(lambda x: str(Path(x).resolve().relative_to(Path.cwd())), file_list)))
151 out_log.info("to: " + str(Path(zip_file).resolve()))
154def unzip_list(
155 zip_file: Union[str, Path], dest_dir: Optional[Union[str, Path]] = None, out_log: Optional[logging.Logger] = None
156) -> list[str]:
157 """Extract all files in the zipball file and return a list containing the
158 absolute path of the extracted files.
160 Args:
161 zip_file (str): Input compressed zip file.
162 dest_dir (str): Path to directory where the files will be extracted.
163 out_log (:obj:`logging.Logger`): Input log object.
165 Returns:
166 :obj:`list` of :obj:`str`: list of paths of the extracted files.
167 """
168 with zipfile.ZipFile(zip_file, "r") as zip_f:
169 zip_f.extractall(path=dest_dir)
170 file_list = [str(Path(str(dest_dir)).joinpath(f)) for f in zip_f.namelist()]
172 if out_log:
173 out_log.info("Extracting: " + str(Path(zip_file).resolve()))
174 out_log.info("to:")
175 out_log.info(str(file_list))
177 return file_list
180def search_topology_files(
181 top_file: Union[str, Path], out_log: Optional[logging.Logger] = None
182) -> list[str]:
183 """Search the top and itp files to create a list of the topology files
185 Args:
186 top_file (str): Topology GROMACS top file.
187 out_log (:obj:`logging.Logger`): Input log object.
189 Returns:
190 :obj:`list` of :obj:`str`: list of paths of the extracted files.
191 """
192 top_dir_name = str(Path(top_file).parent)
193 file_list = []
194 pattern = re.compile(r"#include\s+\"(.+)\"")
195 if Path(top_file).exists():
196 with open(top_file) as tf:
197 for line in tf:
198 include_file = pattern.match(line.strip())
199 if include_file:
200 found_file = str(Path(top_dir_name).joinpath(include_file.group(1)))
201 file_list += search_topology_files(found_file, out_log)
202 else:
203 if out_log:
204 out_log.info("Ignored file %s" % top_file)
205 return file_list
206 return file_list + [str(top_file)]
209def zip_top(
210 zip_file: Union[str, Path],
211 top_file: Union[str, Path],
212 out_log: Optional[logging.Logger] = None,
213 remove_original_files: bool = True,
214) -> list[str]:
215 """Compress all *.itp and *.top files in the cwd into **zip_file** zip file.
217 Args:
218 zip_file (str): Output compressed zip file.
219 top_file (str): Topology TOP GROMACS file.
220 out_log (:obj:`logging.Logger`): Input log object.
222 Returns:
223 :obj:`list` of :obj:`str`: list of compressed paths.
224 """
226 file_list = search_topology_files(top_file, out_log)
227 zip_list(zip_file, file_list, out_log)
228 if remove_original_files:
229 rm_file_list(file_list, out_log)
230 return file_list
233def unzip_top(
234 zip_file: Union[str, Path],
235 out_log: Optional[logging.Logger] = None,
236 unique_dir: Optional[Union[pathlib.Path, str]] = None,
237) -> str:
238 """Extract all files in the zip_file and copy the file extracted ".top" file to top_file.
240 Args:
241 zip_file (str): Input topology zipball file path.
242 out_log (:obj:`logging.Logger`): Input log object.
243 unique_dir (str): Directory where the topology will be extracted.
245 Returns:
246 str: Path to the extracted ".top" file.
248 """
249 unique_dir = unique_dir or create_unique_dir()
250 top_list = unzip_list(zip_file, unique_dir, out_log)
251 top_file = next(name for name in top_list if name.endswith(".top"))
252 if out_log:
253 out_log.info("Unzipping: ")
254 out_log.info(zip_file)
255 out_log.info("To: ")
256 for file_name in top_list:
257 out_log.info(file_name)
258 return top_file
261def get_logs_prefix():
262 return 4 * " "
265def create_incremental_name(path: Union[Path, str]) -> str:
266 """Increment the name of the file by adding a number at the end.
268 Args:
269 path (str): path of the file.
271 Returns:
272 str: Incremented name of the file.
273 """
274 if (path_obj := Path(path)).exists():
275 cont = 1
276 while path_obj.exists():
277 new_name = f'{path_obj.stem.rstrip("0123456789_")}_{cont}{path_obj.suffix}'
278 path_obj = path_obj.with_name(new_name)
279 cont += 1
280 return str(path_obj)
283def get_logs(
284 path: Optional[Union[str, Path]] = None,
285 prefix: Optional[str] = None,
286 step: Optional[str] = None,
287 can_write_console: bool = True,
288 can_write_file: bool = True,
289 out_log_path: Optional[Union[str, Path]] = None,
290 err_log_path: Optional[Union[str, Path]] = None,
291 level: str = "INFO",
292 light_format: bool = False,
293) -> tuple[logging.Logger, logging.Logger]:
294 """Get the error and and out Python Logger objects.
296 Args:
297 path (str): (current working directory) Path to the log file directory.
298 prefix (str): Prefix added to the name of the log file.
299 step (str): String added between the **prefix** arg and the name of the log file.
300 can_write_console (bool): (True) If True, show log in the execution terminal.
301 can_write_file (bool): (True) If True, write log to the log files.
302 out_log_path (str): (None) Path to the out log file.
303 err_log_path (str): (None) Path to the err log file.
304 level (str): ('INFO') Set Logging level. ['CRITICAL','ERROR','WARNING','INFO','DEBUG','NOTSET']
305 light_format (bool): (False) Minimalist log format.
307 Returns:
308 :obj:`tuple` of :obj:`logging.Logger` and :obj:`logging.Logger`: Out and err Logger objects.
309 """
310 out_log_path = out_log_path or "log.out"
311 err_log_path = err_log_path or "log.err"
312 # If paths are not absolute create and return them
313 if not Path(out_log_path).is_absolute():
314 out_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(out_log_path)))
315 if not Path(err_log_path).is_absolute():
316 err_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(err_log_path)))
317 # Create logging objects
318 out_Logger = logging.getLogger(str(out_log_path))
319 err_Logger = logging.getLogger(str(err_log_path))
321 # Create logging format
322 logFormatter = logging.Formatter(
323 "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s"
324 )
325 if light_format:
326 logFormatter = logging.Formatter("%(asctime)s %(message)s", "%H:%M:%S")
328 if can_write_file:
329 prefix = prefix if prefix else ""
330 step = step if step else ""
331 path = path if path else str(Path.cwd())
333 # Create dir if it not exists
334 create_dir(str(Path(out_log_path).resolve().parent))
336 # Create FileHandler
337 out_fileHandler = logging.FileHandler(out_log_path, mode="a", encoding=None, delay=True)
338 err_fileHandler = logging.FileHandler(err_log_path, mode="a", encoding=None, delay=True)
339 # Asign format to FileHandler
340 out_fileHandler.setFormatter(logFormatter)
341 err_fileHandler.setFormatter(logFormatter)
343 # Assign FileHandler to logging object
344 if not len(out_Logger.handlers):
345 out_Logger.addHandler(out_fileHandler)
346 err_Logger.addHandler(err_fileHandler)
348 if can_write_console:
349 console_out = logging.StreamHandler(stream=sys.stdout)
350 console_err = logging.StreamHandler(stream=sys.stderr)
351 console_out.setFormatter(logFormatter)
352 console_err.setFormatter(logFormatter)
353 # Assign consoleHandler to logging objects as aditional output
354 if len(out_Logger.handlers) < 2:
355 out_Logger.addHandler(console_out)
356 err_Logger.addHandler(console_err)
358 # Set logging level level
359 out_Logger.setLevel(level)
360 err_Logger.setLevel(level)
362 return out_Logger, err_Logger
365def launchlogger(func):
366 """Decorator to create the out_log and err_log"""
367 @functools.wraps(func)
368 def wrapper_log(*args, **kwargs):
369 create_dir(create_name(path=args[0].path))
370 if args[0].disable_logs:
371 return func(*args, **kwargs)
373 # Create local out_log and err_log
374 args[0].out_log, args[0].err_log = get_logs(
375 path=args[0].path,
376 prefix=args[0].prefix,
377 step=args[0].step,
378 can_write_console=args[0].can_write_console_log,
379 can_write_file=args[0].can_write_file_log,
380 out_log_path=args[0].out_log_path,
381 err_log_path=args[0].err_log_path
382 )
384 # Run the function and capture its return value
385 value = func(*args, **kwargs)
387 # Close and remove handlers from out_log and err_log
388 for log in [args[0].out_log, args[0].err_log]:
389 # Create a copy [:] of the handler list to be able to modify it while we are iterating
390 handlers = log.handlers[:]
391 for handler in handlers:
392 handler.close()
393 log.removeHandler(handler)
395 return value
397 return wrapper_log
400def log(string: str, local_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None):
401 """Checks if log exists
403 Args:
404 string (str): Message to log.
405 local_log (:obj:`logging.Logger`): local log object.
406 global_log (:obj:`logging.Logger`): global log object.
408 """
409 if local_log:
410 local_log.info(string)
411 if global_log:
412 global_log.info(get_logs_prefix() + string)
415def human_readable_time(time_ps: int) -> str:
416 """Transform **time_ps** to a human readable string.
418 Args:
419 time_ps (int): Time in pico seconds.
421 Returns:
422 str: Human readable time.
423 """
424 time_units = [
425 "femto seconds",
426 "pico seconds",
427 "nano seconds",
428 "micro seconds",
429 "mili seconds",
430 ]
431 t = time_ps * 1000
432 for tu in time_units:
433 if t < 1000:
434 return str(t) + " " + tu
436 t = int(t/1000)
437 return str(time_ps)
440def check_properties(obj: object, properties: dict, reserved_properties: Optional[list[str]] = None):
441 if not reserved_properties:
442 reserved_properties = []
443 error_properties = set(
444 [prop for prop in properties.keys() if prop not in obj.__dict__.keys()]
445 )
446 error_properties -= set(["system", "working_dir_path"] + list(reserved_properties))
447 for error_property in error_properties:
448 close_property_list = difflib.get_close_matches(
449 error_property, obj.__dict__.keys(), n=1, cutoff=0.01
450 )
451 close_property = close_property_list[0] if close_property_list else ""
452 warnings.warn(
453 "Warning: %s is not a recognized property. The most similar property is: %s"
454 % (error_property, close_property)
455 )
458def create_name(
459 path: Optional[Union[str, Path]] = None, prefix: Optional[str] = None,
460 step: Optional[str] = None, name: Optional[str] = None
461) -> str:
462 """Return file name.
464 Args:
465 path (str): Path to the file directory.
466 prefix (str): Prefix added to the name of the file.
467 step (str): String added between the **prefix** arg and the **name** arg of the file.
468 name (str): Name of the file.
470 Returns:
471 str: Composed file name.
472 """
473 name = "" if name is None else name.strip()
474 if step:
475 if name:
476 name = step + "_" + name
477 else:
478 name = step
479 if prefix:
480 prefix = prefix.replace("/", "_")
481 if name:
482 name = prefix + "_" + name
483 else:
484 name = prefix
485 if path:
486 if name:
487 name = str(Path(path).joinpath(name))
488 else:
489 name = str(path)
490 return name
493def write_failed_output(file_name: str):
494 with open(file_name, "w") as f:
495 f.write("Error\n")
498def rm(file_name: Union[str, Path]) -> Optional[Union[str, Path]]:
499 try:
500 file_path = pathlib.Path(file_name)
501 if file_path.exists():
502 if file_path.is_dir():
503 shutil.rmtree(file_name)
504 return file_name
505 if file_path.is_file():
506 Path(file_name).unlink()
507 return file_name
508 except Exception:
509 pass
510 return None
513def rm_file_list(
514 file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None
515) -> list[str]:
516 removed_files = [str(f) for f in file_list if rm(f)]
517 if len(removed_files) > 0 and out_log:
518 log("Removed: %s" % str(removed_files), out_log)
519 return removed_files
522def check_complete_files(output_file_list: list[Union[str, Path]]) -> bool:
523 for output_file in filter(None, output_file_list):
524 output_file = Path(str(output_file))
525 file_exists = output_file.is_file() and output_file.stat().st_size > 0
526 dir_exists = output_file.is_dir() and any(output_file.iterdir())
527 if not file_exists and not dir_exists:
528 return False
529 return True
532def copytree_new_files_only(source, destination):
533 """
534 Recursively copies files from source to destination only if they don't
535 already exist in the destination.
536 """
537 if not os.path.exists(destination):
538 os.makedirs(destination)
540 for dirpath, dirnames, filenames in os.walk(source):
541 # Create a corresponding directory in the destination
542 relative_path = os.path.relpath(dirpath, source)
543 dest_dir = os.path.join(destination, relative_path)
544 if not os.path.exists(dest_dir):
545 os.makedirs(dest_dir)
547 # Copy files that do not exist or have newer modification times
548 for filename in filenames:
549 src_file_path = os.path.join(dirpath, filename)
550 dest_file_path = os.path.join(dest_dir, filename)
552 if not os.path.exists(dest_file_path) or os.path.getmtime(src_file_path) > os.path.getmtime(dest_file_path):
553 shutil.copy2(src_file_path, dest_file_path)
556def copy_to_container(container_path: Optional[Union[str, Path]], container_volume_path: str,
557 io_dict: dict, out_log: Optional[logging.Logger] = None) -> dict:
558 if not container_path:
559 return io_dict
561 unique_dir = str(Path(create_unique_dir()).resolve())
562 container_io_dict: dict = {"in": {}, "out": {}, "unique_dir": unique_dir}
564 # IN files COPY and assign INTERNAL PATH
565 for file_ref, file_path in io_dict["in"].items():
566 if file_path:
567 if Path(file_path).exists():
568 shutil.copy2(file_path, unique_dir)
569 log(f"Copy: {file_path} to {unique_dir}")
570 container_io_dict["in"][file_ref] = str(
571 Path(container_volume_path).joinpath(Path(file_path).name)
572 )
573 else:
574 # Default files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro)
575 container_io_dict["in"][file_ref] = file_path
577 # OUT files assign INTERNAL PATH
578 for file_ref, file_path in io_dict["out"].items():
579 if file_path:
580 container_io_dict["out"][file_ref] = str(
581 Path(container_volume_path).joinpath(Path(file_path).name)
582 )
584 return container_io_dict
587def copy_to_host(container_path: str, container_io_dict: dict, io_dict: dict):
588 if not container_path:
589 return
591 # OUT files COPY
592 for file_ref, file_path in container_io_dict["out"].items():
593 if file_path:
594 container_file_path = str(
595 Path(container_io_dict["unique_dir"]).joinpath(Path(file_path).name)
596 )
597 if Path(container_file_path).exists():
598 shutil.copy2(container_file_path, io_dict["out"][file_ref])
601def create_cmd_line(
602 cmd: list[str],
603 container_path: Optional[Union[str, Path]] = "",
604 host_volume: Optional[Union[str, Path]] = None,
605 container_volume: Optional[Union[str, Path]] = None,
606 container_working_dir: Optional[Union[str, Path]] = None,
607 container_user_uid: Optional[str] = None,
608 container_shell_path: Optional[Union[str, Path]] = None,
609 container_image: Optional[Union[str, Path]] = None,
610 out_log: Optional[logging.Logger] = None,
611 global_log: Optional[logging.Logger] = None
612) -> list[str]:
613 container_path = container_path or ""
614 if str(container_path).endswith("singularity"):
615 log("Using Singularity image %s" % container_image, out_log, global_log)
616 if not Path(str(container_image)).exists():
617 log(
618 f"{container_image} does not exist trying to pull it",
619 out_log,
620 global_log,
621 )
622 container_image_name = str(Path(str(container_image)).with_suffix(".sif").name)
623 singularity_pull_cmd = [
624 str(container_path),
625 "pull",
626 "--name",
627 str(container_image_name),
628 str(container_image),
629 ]
630 try:
631 from biobb_common.command_wrapper import cmd_wrapper
633 cmd_wrapper.CmdWrapper(cmd=singularity_pull_cmd, out_log=out_log).launch()
634 if Path(container_image_name).exists():
635 container_image = container_image_name
636 else:
637 raise FileNotFoundError
638 except FileNotFoundError:
639 log(f"{' '.join(singularity_pull_cmd)} not found", out_log, global_log)
640 raise FileNotFoundError
641 singularity_cmd: list[str] = [
642 str(container_path),
643 "exec",
644 "-e",
645 "--bind",
646 str(host_volume) + ":" + str(container_volume),
647 str(container_image),
648 ]
649 # If we are working on a mac remove -e option because is still no available
650 if platform == "darwin":
651 if "-e" in singularity_cmd:
652 singularity_cmd.remove("-e")
654 cmd = ['"' + " ".join(cmd) + '"']
655 singularity_cmd.extend([str(container_shell_path), "-c"])
656 return singularity_cmd + cmd
658 elif str(container_path).endswith("docker"):
659 log("Using Docker image %s" % container_image, out_log, global_log)
660 docker_cmd = [str(container_path), "run"]
661 if container_working_dir:
662 docker_cmd.append("-w")
663 docker_cmd.append(str(container_working_dir))
664 if container_volume:
665 docker_cmd.append("-v")
666 docker_cmd.append(str(host_volume) + ":" + str(container_volume))
667 if container_user_uid:
668 docker_cmd.append("--user")
669 docker_cmd.append(container_user_uid)
671 docker_cmd.append(str(container_image))
673 cmd = ['"' + " ".join(cmd) + '"']
674 docker_cmd.extend([str(container_shell_path), "-c"])
675 return docker_cmd + cmd
677 elif str(container_path).endswith("pcocc"):
678 # pcocc run -I racov56:pmx cli.py mutate -h
679 log("Using pcocc image %s" % container_image, out_log, global_log)
680 pcocc_cmd = [str(container_path), "run", "-I", str(container_image)]
681 if container_working_dir:
682 pcocc_cmd.append("--cwd")
683 pcocc_cmd.append(str(container_working_dir))
684 if container_volume:
685 pcocc_cmd.append("--mount")
686 pcocc_cmd.append(str(host_volume) + ":" + str(container_volume))
687 if container_user_uid:
688 pcocc_cmd.append("--user")
689 pcocc_cmd.append(container_user_uid)
691 cmd = ['\\"' + " ".join(cmd) + '\\"']
692 pcocc_cmd.extend([str(container_shell_path), "-c"])
693 return pcocc_cmd + cmd
695 else:
696 # log('Not using any container', out_log, global_log)
697 return cmd
700def get_doc_dicts(doc: Optional[str]):
701 regex_argument = re.compile(
702 r"(?P<argument>\w*)\ *(?:\()(?P<type>\w*)(?:\)):?\ *(?P<optional>\(\w*\):)?\ *(?P<description>.*?)(?:\.)\ *(?:File type:\ *)(?P<input_output>\w+)\.\ *(\`(?:.+)\<(?P<sample_file>.*?)\>\`\_\.)?\ *(?:Accepted formats:\ *)(?P<formats>.+)(?:\.)?"
703 )
704 regex_argument_formats = re.compile(
705 r"(?P<extension>\w*)\ *(\(\ *)\ *edam\ *:\ *(?P<edam>\w*)"
706 )
707 regex_property = re.compile(
708 r"(?:\*\ *\*\*)(?P<property>.*?)(?:\*\*)\ *(?:\(\*)(?P<type>\w*)(?:\*\))\ *\-\ ?(?:\()(?P<default_value>.*?)(?:\))\ *(?:(?:\[)(?P<wf_property>WF property)(?:\]))?\ *(?:(?:\[)(?P<range_start>[\-]?\d+(?:\.\d+)?)\~(?P<range_stop>[\-]?\d+(?:\.\d+)?)(?:\|)?(?P<range_step>\d+(?:\.\d+)?)?(?:\]))?\ *(?:(?:\[)(.*?)(?:\]))?\ *(?P<description>.*)"
709 )
710 regex_property_value = re.compile(
711 r"(?P<value>\w*)\ *(?:(?:\()(?P<description>.*?)?(?:\)))?"
712 )
714 doc_lines = list(
715 map(str.strip, filter(lambda line: line.strip(), str(doc).splitlines()))
716 )
717 args_index = doc_lines.index(
718 next(filter(lambda line: line.lower().startswith("args"), doc_lines))
719 )
720 properties_index = doc_lines.index(
721 next(filter(lambda line: line.lower().startswith("properties"), doc_lines))
722 )
723 examples_index = doc_lines.index(
724 next(filter(lambda line: line.lower().startswith("examples"), doc_lines))
725 )
726 arguments_lines_list = doc_lines[args_index + 1: properties_index]
727 properties_lines_list = doc_lines[properties_index + 1: examples_index]
729 doc_arguments_dict = {}
730 for argument_line in arguments_lines_list:
731 match_argument = regex_argument.match(argument_line)
732 argument_dict = match_argument.groupdict() if match_argument is not None else {}
733 argument_dict["formats"] = {
734 match.group("extension"): match.group("edam")
735 for match in regex_argument_formats.finditer(argument_dict["formats"])
736 }
737 doc_arguments_dict[argument_dict.pop("argument")] = argument_dict
739 doc_properties_dict = {}
740 for property_line in properties_lines_list:
741 match_property = regex_property.match(property_line)
742 property_dict = match_property.groupdict() if match_property is not None else {}
743 property_dict["values"] = None
744 if "Values:" in property_dict["description"]:
745 property_dict["description"], property_dict["values"] = property_dict[
746 "description"
747 ].split("Values:")
748 property_dict["values"] = {
749 match.group("value"): match.group("description")
750 for match in regex_property_value.finditer(property_dict["values"])
751 if match.group("value")
752 }
753 doc_properties_dict[property_dict.pop("property")] = property_dict
755 return doc_arguments_dict, doc_properties_dict
758def check_argument(
759 path: Optional[pathlib.Path],
760 argument: str,
761 optional: bool,
762 module_name: str,
763 input_output: Optional[str] = None,
764 output_files_created: bool = False,
765 type: Optional[str] = None,
766 extension_list: Optional[list[str]] = None,
767 raise_exception: bool = True,
768 check_extensions: bool = True,
769 out_log: Optional[logging.Logger] = None,
770) -> None:
771 if optional and not path:
772 return None
774 if input_output in ["in", "input"]:
775 input_file = True
776 elif input_output in ["out", "output"]:
777 input_file = False
778 else:
779 unable_to_determine_string = (
780 f"{module_name} {argument}: Unable to determine if input or output file."
781 )
782 log(unable_to_determine_string, out_log)
783 if raise_exception:
784 raise FileNotFoundError(
785 errno.ENOENT, os.strerror(errno.ENOENT), unable_to_determine_string
786 )
787 warnings.warn(unable_to_determine_string)
789 if input_file or output_files_created:
790 not_found_error_string = (
791 f"Path {path} --- {module_name}: Unexisting {argument} file."
792 )
793 if not Path(str(path)).exists():
794 log(not_found_error_string, out_log)
795 if raise_exception:
796 raise FileNotFoundError(
797 errno.ENOENT, os.strerror(errno.ENOENT), not_found_error_string
798 )
799 warnings.warn(not_found_error_string)
800 # else:
801 # if not path.parent.exists():
802 # not_found_dir_error_string = f"Path {path.parent} --- {module_name}: Unexisting {argument} directory."
803 # log(not_found_dir_error_string, out_log)
804 # if raise_exception:
805 # raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), not_found_dir_error_string)
806 # warnings.warn(not_found_dir_error_string)
808 if check_extensions and extension_list and type != "dir":
809 no_extension_error_string = f"{module_name} {argument}: {path} has no extension. If you want to suppress this message, please set the check_extensions property to False"
810 if not Path(str(path)).suffix:
811 log(no_extension_error_string)
812 warnings.warn(no_extension_error_string)
813 else:
814 not_valid_extension_error_string = f"{module_name} {argument}: {path} extension is not in the valid extensions list: {extension_list}. If you want to suppress this message, please set the check_extensions property to False"
815 if not Path(str(path)).suffix[1:].lower() in extension_list:
816 log(not_valid_extension_error_string)
817 warnings.warn(not_valid_extension_error_string)
820@contextmanager
821def change_dir(destination):
822 """Context manager for changing directory."""
823 cwd = os.getcwd()
824 if not Path(destination).exists():
825 os.makedirs(destination)
826 try:
827 os.chdir(destination)
828 yield
829 finally:
830 os.chdir(cwd)