Coverage for biobb_common/biobb_common/tools/file_utils.py: 45%
383 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 11:32 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 11:32 +0000
1"""Tools to work with files
2"""
3import difflib
4import functools
5import logging
6import os
7import errno
8import pathlib
9import re
10import shutil
11import uuid
12import warnings
13import zipfile
14from sys import platform
15from pathlib import Path
16import typing
17from typing import Optional, Union
18import sys
21def create_unique_file_path(parent_dir: Optional[Union[str, Path]] = None, extension: Optional[Union[str, Path]] = None) -> str:
22 if not parent_dir:
23 parent_dir = Path.cwd()
24 if not extension:
25 extension = ""
26 while True:
27 name = f"{uuid.uuid4()}{extension}"
28 file_path = Path.joinpath(Path(parent_dir).resolve(), name)
29 if not file_path.exists():
30 return str(file_path)
33def create_dir(dir_path: str) -> str:
34 """Returns the directory **dir_path** and create it if path does not exist.
36 Args:
37 dir_path (str): Path to the directory that will be created.
39 Returns:
40 str: Directory dir path.
41 """
42 if not Path(dir_path).exists():
43 Path(dir_path).mkdir(exist_ok=True, parents=True)
44 return str(Path(dir_path))
47def create_stdin_file(intput_string: str) -> str:
48 file_path = create_unique_file_path(extension=".stdin")
49 with open(file_path, "w") as file_handler:
50 file_handler.write(intput_string)
51 return file_path
54def create_unique_dir(
55 path: str = "",
56 prefix: str = "",
57 number_attempts: int = 10,
58 out_log: Optional[logging.Logger] = None,
59) -> str:
60 """Create a directory with a prefix + computed unique name. If the
61 computed name collides with an existing file name it attemps
62 **number_attempts** times to create another unique id and create
63 the directory with the new name.
65 Args:
66 path (str): ('') Parent path of the new directory.
67 prefix (str): ('') String to be added before the computed unique dir name.
68 number_attempts (int): (10) number of times creating the directory if there's a name conflict.
69 out_log (logger): (None) Python logger object.
71 Returns:
72 str: Directory dir path.
73 """
74 new_dir = prefix + str(uuid.uuid4())
75 if path:
76 new_dir = str(Path(path).joinpath(new_dir))
77 for i in range(number_attempts):
78 try:
79 oldumask = os.umask(0)
80 Path(new_dir).mkdir(mode=0o777, parents=True, exist_ok=False)
81 if out_log:
82 out_log.info("%s directory successfully created" % new_dir)
83 os.umask(oldumask)
84 return new_dir
85 except OSError:
86 if out_log:
87 out_log.info(new_dir + " Already exists")
88 out_log.info("Retrying %i times more" % (number_attempts - i))
89 new_dir = prefix + str(uuid.uuid4().hex)
90 if path:
91 new_dir = str(Path(path).joinpath(new_dir))
92 if out_log:
93 out_log.info("Trying with: " + new_dir)
94 raise FileExistsError
97def get_working_dir_path(working_dir_path: Optional[Union[str, Path]] = None, restart: bool = False) -> str:
98 """Return the directory **working_dir_path** and create it if working_dir_path
99 does not exist. If **working_dir_path** exists a consecutive numerical suffix
100 is added to the end of the **working_dir_path** and is returned.
102 Args:
103 working_dir_path (str): Path to the workflow results.
104 restart (bool): If step result exists do not execute the step again.
106 Returns:
107 str: Path to the workflow results directory.
108 """
109 if not working_dir_path:
110 return str(Path.cwd().resolve())
112 working_dir_path = str(Path(working_dir_path).resolve())
114 if (not Path(working_dir_path).exists()) or restart:
115 return str(Path(working_dir_path))
117 cont = 1
118 while Path(str(working_dir_path)).exists():
119 working_dir_path = (
120 re.split(r"_[0-9]+$", str(working_dir_path))[0] + "_" + str(cont)
121 )
122 cont += 1
123 return str(working_dir_path)
126def zip_list(
127 zip_file: Union[str, Path], file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None
128):
129 """Compress all files listed in **file_list** into **zip_file** zip file.
131 Args:
132 zip_file (str): Output compressed zip file.
133 file_list (:obj:`list` of :obj:`str`): Input list of files to be compressed.
134 out_log (:obj:`logging.Logger`): Input log object.
135 """
136 file_list = list(file_list)
137 file_list.sort()
138 Path(zip_file).parent.mkdir(parents=True, exist_ok=True)
139 with zipfile.ZipFile(zip_file, "w") as zip_f:
140 inserted = []
141 for index, f in enumerate(file_list):
142 base_name = Path(f).name
143 if base_name in inserted:
144 base_name = "file_" + str(index) + "_" + base_name
145 inserted.append(base_name)
146 zip_f.write(f, arcname=base_name)
147 if out_log:
148 out_log.info("Adding:")
149 out_log.info(str(file_list))
150 out_log.info("to: " + str(Path(zip_file).resolve()))
153def unzip_list(
154 zip_file: Union[str, Path], dest_dir: Optional[Union[str, Path]] = None, out_log: Optional[logging.Logger] = None
155) -> list[str]:
156 """Extract all files in the zipball file and return a list containing the
157 absolute path of the extracted files.
159 Args:
160 zip_file (str): Input compressed zip file.
161 dest_dir (str): Path to directory where the files will be extracted.
162 out_log (:obj:`logging.Logger`): Input log object.
164 Returns:
165 :obj:`list` of :obj:`str`: list of paths of the extracted files.
166 """
167 with zipfile.ZipFile(zip_file, "r") as zip_f:
168 zip_f.extractall(path=dest_dir)
169 file_list = [str(Path(str(dest_dir)).joinpath(f)) for f in zip_f.namelist()]
171 if out_log:
172 out_log.info("Extracting: " + str(Path(zip_file).resolve()))
173 out_log.info("to:")
174 out_log.info(str(file_list))
176 return file_list
179def search_topology_files(
180 top_file: Union[str, Path], out_log: Optional[logging.Logger] = None
181) -> list[str]:
182 """Search the top and itp files to create a list of the topology files
184 Args:
185 top_file (str): Topology GROMACS top file.
186 out_log (:obj:`logging.Logger`): Input log object.
188 Returns:
189 :obj:`list` of :obj:`str`: list of paths of the extracted files.
190 """
191 top_dir_name = str(Path(top_file).parent)
192 file_list = []
193 pattern = re.compile(r"#include\s+\"(.+)\"")
194 if Path(top_file).exists():
195 with open(top_file) as tf:
196 for line in tf:
197 include_file = pattern.match(line.strip())
198 if include_file:
199 found_file = str(Path(top_dir_name).joinpath(include_file.group(1)))
200 file_list += search_topology_files(found_file, out_log)
201 else:
202 if out_log:
203 out_log.info("Ignored file %s" % top_file)
204 return file_list
205 return file_list + [str(top_file)]
208def zip_top(
209 zip_file: Union[str, Path],
210 top_file: Union[str, Path],
211 out_log: Optional[logging.Logger] = None,
212 remove_original_files: bool = True,
213) -> list[str]:
214 """Compress all *.itp and *.top files in the cwd into **zip_file** zip file.
216 Args:
217 zip_file (str): Output compressed zip file.
218 top_file (str): Topology TOP GROMACS file.
219 out_log (:obj:`logging.Logger`): Input log object.
221 Returns:
222 :obj:`list` of :obj:`str`: list of compressed paths.
223 """
225 file_list = search_topology_files(top_file, out_log)
226 zip_list(zip_file, file_list, out_log)
227 if remove_original_files:
228 rm_file_list(file_list, out_log)
229 return file_list
232def unzip_top(
233 zip_file: Union[str, Path],
234 out_log: Optional[logging.Logger] = None,
235 unique_dir: Optional[Union[pathlib.Path, str]] = None,
236) -> str:
237 """Extract all files in the zip_file and copy the file extracted ".top" file to top_file.
239 Args:
240 zip_file (str): Input topology zipball file path.
241 out_log (:obj:`logging.Logger`): Input log object.
242 unique_dir (str): Directory where the topology will be extracted.
244 Returns:
245 str: Path to the extracted ".top" file.
247 """
248 unique_dir = unique_dir or create_unique_dir()
249 top_list = unzip_list(zip_file, unique_dir, out_log)
250 top_file = next(name for name in top_list if name.endswith(".top"))
251 if out_log:
252 out_log.info("Unzipping: ")
253 out_log.info(zip_file)
254 out_log.info("To: ")
255 for file_name in top_list:
256 out_log.info(file_name)
257 return top_file
260def get_logs_prefix():
261 return 4 * " "
264def create_incremental_name(path: Union[Path, str]) -> str:
265 """Increment the name of the file by adding a number at the end.
267 Args:
268 path (str): path of the file.
270 Returns:
271 str: Incremented name of the file.
272 """
273 if (path_obj := Path(path)).exists():
274 cont = 1
275 while path_obj.exists():
276 new_name = f'{path_obj.stem.rstrip("0123456789_")}_{cont}{path_obj.suffix}'
277 path_obj = path_obj.with_name(new_name)
278 cont += 1
279 return str(path_obj)
282def get_logs(
283 path: Optional[Union[str, Path]] = None,
284 prefix: Optional[str] = None,
285 step: Optional[str] = None,
286 can_write_console: bool = True,
287 out_log_path: Optional[Union[str, Path]] = None,
288 err_log_path: Optional[Union[str, Path]] = None,
289 level: str = "INFO",
290 light_format: bool = False,
291) -> tuple[logging.Logger, logging.Logger]:
292 """Get the error and and out Python Logger objects.
294 Args:
295 path (str): (current working directory) Path to the log file directory.
296 prefix (str): Prefix added to the name of the log file.
297 step (str): String added between the **prefix** arg and the name of the log file.
298 can_write_console (bool): (False) If True, show log in the execution terminal.
299 out_log_path (str): (None) Path to the out log file.
300 err_log_path (str): (None) Path to the err log file.
301 level (str): ('INFO') Set Logging level. ['CRITICAL','ERROR','WARNING','INFO','DEBUG','NOTSET']
302 light_format (bool): (False) Minimalist log format.
304 Returns:
305 :obj:`tuple` of :obj:`logging.Logger` and :obj:`logging.Logger`: Out and err Logger objects.
306 """
307 prefix = prefix if prefix else ""
308 step = step if step else ""
309 path = path if path else str(Path.cwd())
311 out_log_path = out_log_path or "log.out"
312 err_log_path = err_log_path or "log.err"
314 # If paths are absolute create and return them
315 if not Path(out_log_path).is_absolute():
316 out_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(out_log_path)))
317 if not Path(err_log_path).is_absolute():
318 err_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(err_log_path)))
320 # Create dir if it not exists
321 create_dir(str(Path(out_log_path).resolve().parent))
323 # Create logging format
324 logFormatter = logging.Formatter(
325 "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s"
326 )
327 if light_format:
328 logFormatter = logging.Formatter("%(asctime)s %(message)s", "%H:%M:%S")
329 # Create logging objects
330 out_Logger = logging.getLogger(str(out_log_path))
331 err_Logger = logging.getLogger(str(err_log_path))
333 # Create FileHandler
334 out_fileHandler = logging.FileHandler(
335 out_log_path, mode="a", encoding=None, delay=False
336 )
337 err_fileHandler = logging.FileHandler(
338 err_log_path, mode="a", encoding=None, delay=False
339 )
341 # Asign format to FileHandler
342 out_fileHandler.setFormatter(logFormatter)
343 err_fileHandler.setFormatter(logFormatter)
345 # Assign FileHandler to logging object
346 if not len(out_Logger.handlers):
347 out_Logger.addHandler(out_fileHandler)
348 err_Logger.addHandler(err_fileHandler)
350 # Create consoleHandler
351 consoleHandler = logging.StreamHandler(stream=sys.stdout)
352 # Assign format to consoleHandler
353 consoleHandler.setFormatter(logFormatter)
355 # Assign consoleHandler to logging objects as aditional output
356 if can_write_console and len(out_Logger.handlers) < 2:
357 out_Logger.addHandler(consoleHandler)
358 err_Logger.addHandler(consoleHandler)
360 # Set logging level level
361 out_Logger.setLevel(level)
362 err_Logger.setLevel(level)
363 return out_Logger, err_Logger
366def launchlogger(func):
367 @functools.wraps(func)
368 def wrapper_log(*args, **kwargs):
369 create_dir(create_name(path=args[0].path))
370 if args[0].disable_logs:
371 return func(*args, **kwargs)
373 args[0].out_log, args[0].err_log = get_logs(
374 path=args[0].path,
375 prefix=args[0].prefix,
376 step=args[0].step,
377 can_write_console=args[0].can_write_console_log,
378 out_log_path=args[0].out_log_path,
379 err_log_path=args[0].err_log_path
380 )
381 value = func(*args, **kwargs)
382 handlers = args[0].out_log.handlers[
383 :
384 ] # Create a copy [:] of the handler list to be able to modify it while we are iterating
385 for handler in handlers:
386 handler.close()
387 args[0].out_log.removeHandler(handler)
388 handlers = args[0].err_log.handlers[
389 :
390 ] # Create a copy [:] of the handler list to be able to modify it while we are iterating
391 for handler in handlers:
392 handler.close()
393 args[0].err_log.removeHandler(handler)
394 return value
396 return wrapper_log
399def log(string: str, local_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None):
400 """Checks if log exists
402 Args:
403 string (str): Message to log.
404 local_log (:obj:`logging.Logger`): local log object.
405 global_log (:obj:`logging.Logger`): global log object.
407 """
408 if local_log:
409 local_log.info(string)
410 if global_log:
411 global_log.info(get_logs_prefix() + string)
414def human_readable_time(time_ps: int) -> str:
415 """Transform **time_ps** to a human readable string.
417 Args:
418 time_ps (int): Time in pico seconds.
420 Returns:
421 str: Human readable time.
422 """
423 time_units = [
424 "femto seconds",
425 "pico seconds",
426 "nano seconds",
427 "micro seconds",
428 "mili seconds",
429 ]
430 t = time_ps * 1000
431 for tu in time_units:
432 if t < 1000:
433 return str(t) + " " + tu
435 t = int(t/1000)
436 return str(time_ps)
439def check_properties(obj: object, properties: dict, reserved_properties: Optional[list[str]] = None):
440 if not reserved_properties:
441 reserved_properties = []
442 error_properties = set(
443 [prop for prop in properties.keys() if prop not in obj.__dict__.keys()]
444 )
445 error_properties -= set(["system", "working_dir_path"] + list(reserved_properties))
446 for error_property in error_properties:
447 close_property_list = difflib.get_close_matches(
448 error_property, obj.__dict__.keys(), n=1, cutoff=0.01
449 )
450 close_property = close_property_list[0] if close_property_list else ""
451 warnings.warn(
452 "Warning: %s is not a recognized property. The most similar property is: %s"
453 % (error_property, close_property)
454 )
457def create_name(
458 path: Optional[Union[str, Path]] = None, prefix: Optional[str] = None,
459 step: Optional[str] = None, name: Optional[str] = None
460) -> str:
461 """Return file name.
463 Args:
464 path (str): Path to the file directory.
465 prefix (str): Prefix added to the name of the file.
466 step (str): String added between the **prefix** arg and the **name** arg of the file.
467 name (str): Name of the file.
469 Returns:
470 str: Composed file name.
471 """
472 name = "" if name is None else name.strip()
473 if step:
474 if name:
475 name = step + "_" + name
476 else:
477 name = step
478 if prefix:
479 prefix = prefix.replace("/", "_")
480 if name:
481 name = prefix + "_" + name
482 else:
483 name = prefix
484 if path:
485 if name:
486 name = str(Path(path).joinpath(name))
487 else:
488 name = str(path)
489 return name
492def write_failed_output(file_name: str):
493 with open(file_name, "w") as f:
494 f.write("Error\n")
497def rm(file_name: Union[str, Path]) -> Optional[Union[str, Path]]:
498 try:
499 file_path = pathlib.Path(file_name)
500 if file_path.exists():
501 if file_path.is_dir():
502 shutil.rmtree(file_name)
503 return file_name
504 if file_path.is_file():
505 Path(file_name).unlink()
506 return file_name
507 except Exception:
508 pass
509 return None
512def rm_file_list(
513 file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None
514) -> list[str]:
515 removed_files = [str(f) for f in file_list if rm(f)]
516 if out_log:
517 log("Removed: %s" % str(removed_files), out_log)
518 return removed_files
521def check_complete_files(output_file_list: list[Union[str, Path]]) -> bool:
522 for output_file in filter(None, output_file_list):
523 output_file = Path(str(output_file))
524 if not (output_file.is_file() and output_file.stat().st_size > 0):
525 return False
526 return True
529def copy_to_container(container_path: Optional[Union[str, Path]], container_volume_path: str,
530 io_dict: dict, out_log: Optional[logging.Logger] = None) -> dict:
531 if not container_path:
532 return io_dict
534 unique_dir = str(Path(create_unique_dir()).resolve())
535 container_io_dict: dict = {"in": {}, "out": {}, "unique_dir": unique_dir}
537 # IN files COPY and assign INTERNAL PATH
538 for file_ref, file_path in io_dict["in"].items():
539 if file_path:
540 if Path(file_path).exists():
541 shutil.copy2(file_path, unique_dir)
542 log(f"Copy: {file_path} to {unique_dir}")
543 container_io_dict["in"][file_ref] = str(
544 Path(container_volume_path).joinpath(Path(file_path).name)
545 )
546 else:
547 # Default files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro)
548 container_io_dict["in"][file_ref] = file_path
550 # OUT files assign INTERNAL PATH
551 for file_ref, file_path in io_dict["out"].items():
552 if file_path:
553 container_io_dict["out"][file_ref] = str(
554 Path(container_volume_path).joinpath(Path(file_path).name)
555 )
557 return container_io_dict
560def copy_to_host(container_path: str, container_io_dict: dict, io_dict: dict):
561 if not container_path:
562 return
564 # OUT files COPY
565 for file_ref, file_path in container_io_dict["out"].items():
566 if file_path:
567 container_file_path = str(
568 Path(container_io_dict["unique_dir"]).joinpath(Path(file_path).name)
569 )
570 if Path(container_file_path).exists():
571 shutil.copy2(container_file_path, io_dict["out"][file_ref])
574def create_cmd_line(
575 cmd: list[str],
576 container_path: Optional[Union[str, Path]] = "",
577 host_volume: Optional[Union[str, Path]] = None,
578 container_volume: Optional[Union[str, Path]] = None,
579 container_working_dir: Optional[Union[str, Path]] = None,
580 container_user_uid: Optional[str] = None,
581 container_shell_path: Optional[Union[str, Path]] = None,
582 container_image: Optional[Union[str, Path]] = None,
583 out_log: Optional[logging.Logger] = None,
584 global_log: Optional[logging.Logger] = None
585) -> list[str]:
586 container_path = container_path or ""
587 if str(container_path).endswith("singularity"):
588 log("Using Singularity image %s" % container_image, out_log, global_log)
589 if not Path(str(container_image)).exists():
590 log(
591 f"{container_image} does not exist trying to pull it",
592 out_log,
593 global_log,
594 )
595 container_image_name = str(Path(str(container_image)).with_suffix(".sif").name)
596 singularity_pull_cmd = [
597 str(container_path),
598 "pull",
599 "--name",
600 str(container_image_name),
601 str(container_image),
602 ]
603 try:
604 from biobb_common.command_wrapper import cmd_wrapper
606 cmd_wrapper.CmdWrapper(cmd=singularity_pull_cmd, out_log=out_log).launch()
607 if Path(container_image_name).exists():
608 container_image = container_image_name
609 else:
610 raise FileNotFoundError
611 except FileNotFoundError:
612 log(f"{' '.join(singularity_pull_cmd)} not found", out_log, global_log)
613 raise FileNotFoundError
614 singularity_cmd: list[str] = [
615 str(container_path),
616 "exec",
617 "-e",
618 "--bind",
619 str(host_volume) + ":" + str(container_volume),
620 str(container_image),
621 ]
622 # If we are working on a mac remove -e option because is still no available
623 if platform == "darwin":
624 if "-e" in singularity_cmd:
625 singularity_cmd.remove("-e")
627 cmd = ['"' + " ".join(cmd) + '"']
628 singularity_cmd.extend([str(container_shell_path), "-c"])
629 return singularity_cmd + cmd
631 elif str(container_path).endswith("docker"):
632 log("Using Docker image %s" % container_image, out_log, global_log)
633 docker_cmd = [str(container_path), "run"]
634 if container_working_dir:
635 docker_cmd.append("-w")
636 docker_cmd.append(str(container_working_dir))
637 if container_volume:
638 docker_cmd.append("-v")
639 docker_cmd.append(str(host_volume) + ":" + str(container_volume))
640 if container_user_uid:
641 docker_cmd.append("--user")
642 docker_cmd.append(container_user_uid)
644 docker_cmd.append(str(container_image))
646 cmd = ['"' + " ".join(cmd) + '"']
647 docker_cmd.extend([str(container_shell_path), "-c"])
648 return docker_cmd + cmd
650 elif str(container_path).endswith("pcocc"):
651 # pcocc run -I racov56:pmx cli.py mutate -h
652 log("Using pcocc image %s" % container_image, out_log, global_log)
653 pcocc_cmd = [str(container_path), "run", "-I", str(container_image)]
654 if container_working_dir:
655 pcocc_cmd.append("--cwd")
656 pcocc_cmd.append(str(container_working_dir))
657 if container_volume:
658 pcocc_cmd.append("--mount")
659 pcocc_cmd.append(str(host_volume) + ":" + str(container_volume))
660 if container_user_uid:
661 pcocc_cmd.append("--user")
662 pcocc_cmd.append(container_user_uid)
664 cmd = ['\\"' + " ".join(cmd) + '\\"']
665 pcocc_cmd.extend([str(container_shell_path), "-c"])
666 return pcocc_cmd + cmd
668 else:
669 # log('Not using any container', out_log, global_log)
670 return cmd
673def get_doc_dicts(doc: Optional[str]):
674 regex_argument = re.compile(
675 r"(?P<argument>\w*)\ *(?:\()(?P<type>\w*)(?:\)):?\ *(?P<optional>\(\w*\):)?\ *(?P<description>.*?)(?:\.)\ *(?:File type:\ *)(?P<input_output>\w+)\.\ *(\`(?:.+)\<(?P<sample_file>.*?)\>\`\_\.)?\ *(?:Accepted formats:\ *)(?P<formats>.+)(?:\.)?"
676 )
677 regex_argument_formats = re.compile(
678 r"(?P<extension>\w*)\ *(\(\ *)\ *edam\ *:\ *(?P<edam>\w*)"
679 )
680 regex_property = re.compile(
681 r"(?:\*\ *\*\*)(?P<property>.*?)(?:\*\*)\ *(?:\(\*)(?P<type>\w*)(?:\*\))\ *\-\ ?(?:\()(?P<default_value>.*?)(?:\))\ *(?:(?:\[)(?P<wf_property>WF property)(?:\]))?\ *(?:(?:\[)(?P<range_start>[\-]?\d+(?:\.\d+)?)\~(?P<range_stop>[\-]?\d+(?:\.\d+)?)(?:\|)?(?P<range_step>\d+(?:\.\d+)?)?(?:\]))?\ *(?:(?:\[)(.*?)(?:\]))?\ *(?P<description>.*)"
682 )
683 regex_property_value = re.compile(
684 r"(?P<value>\w*)\ *(?:(?:\()(?P<description>.*?)?(?:\)))?"
685 )
687 doc_lines = list(
688 map(str.strip, filter(lambda line: line.strip(), str(doc).splitlines()))
689 )
690 args_index = doc_lines.index(
691 next(filter(lambda line: line.lower().startswith("args"), doc_lines))
692 )
693 properties_index = doc_lines.index(
694 next(filter(lambda line: line.lower().startswith("properties"), doc_lines))
695 )
696 examples_index = doc_lines.index(
697 next(filter(lambda line: line.lower().startswith("examples"), doc_lines))
698 )
699 arguments_lines_list = doc_lines[args_index + 1: properties_index]
700 properties_lines_list = doc_lines[properties_index + 1: examples_index]
702 doc_arguments_dict = {}
703 for argument_line in arguments_lines_list:
704 match_argument = regex_argument.match(argument_line)
705 argument_dict = match_argument.groupdict() if match_argument is not None else {}
706 argument_dict["formats"] = {
707 match.group("extension"): match.group("edam")
708 for match in regex_argument_formats.finditer(argument_dict["formats"])
709 }
710 doc_arguments_dict[argument_dict.pop("argument")] = argument_dict
712 doc_properties_dict = {}
713 for property_line in properties_lines_list:
714 match_property = regex_property.match(property_line)
715 property_dict = match_property.groupdict() if match_property is not None else {}
716 property_dict["values"] = None
717 if "Values:" in property_dict["description"]:
718 property_dict["description"], property_dict["values"] = property_dict[
719 "description"
720 ].split("Values:")
721 property_dict["values"] = {
722 match.group("value"): match.group("description")
723 for match in regex_property_value.finditer(property_dict["values"])
724 if match.group("value")
725 }
726 doc_properties_dict[property_dict.pop("property")] = property_dict
728 return doc_arguments_dict, doc_properties_dict
731def check_argument(
732 path: Optional[pathlib.Path],
733 argument: str,
734 optional: bool,
735 module_name: str,
736 input_output: Optional[str] = None,
737 output_files_created: bool = False,
738 extension_list: Optional[list[str]] = None,
739 raise_exception: bool = True,
740 check_extensions: bool = True,
741 out_log: Optional[logging.Logger] = None,
742) -> None:
743 if optional and not path:
744 return None
746 if input_output in ["in", "input"]:
747 input_file = True
748 elif input_output in ["out", "output"]:
749 input_file = False
750 else:
751 unable_to_determine_string = (
752 f"{module_name} {argument}: Unable to determine if input or output file."
753 )
754 log(unable_to_determine_string, out_log)
755 if raise_exception:
756 raise FileNotFoundError(
757 errno.ENOENT, os.strerror(errno.ENOENT), unable_to_determine_string
758 )
759 warnings.warn(unable_to_determine_string)
761 if input_file or output_files_created:
762 not_found_error_string = (
763 f"Path {path} --- {module_name}: Unexisting {argument} file."
764 )
765 if not Path(str(path)).exists():
766 log(not_found_error_string, out_log)
767 if raise_exception:
768 raise FileNotFoundError(
769 errno.ENOENT, os.strerror(errno.ENOENT), not_found_error_string
770 )
771 warnings.warn(not_found_error_string)
772 # else:
773 # if not path.parent.exists():
774 # not_found_dir_error_string = f"Path {path.parent} --- {module_name}: Unexisting {argument} directory."
775 # log(not_found_dir_error_string, out_log)
776 # if raise_exception:
777 # raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), not_found_dir_error_string)
778 # warnings.warn(not_found_dir_error_string)
780 if check_extensions and extension_list:
781 no_extension_error_string = f"{module_name} {argument}: {path} has no extension. If you want to suppress this message, please set the check_extensions property to False"
782 if not Path(str(path)).suffix:
783 log(no_extension_error_string)
784 warnings.warn(no_extension_error_string)
785 else:
786 not_valid_extension_error_string = f"{module_name} {argument}: {path} extension is not in the valid extensions list: {extension_list}. If you want to suppress this message, please set the check_extensions property to False"
787 if not Path(str(path)).suffix[1:].lower() in extension_list:
788 log(not_valid_extension_error_string)
789 warnings.warn(not_valid_extension_error_string)