Coverage for biobb_common / biobb_common / tools / test_fixtures.py: 34%
277 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 13:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 13:18 +0000
1"""Boiler plate functions for testsys
2"""
3import os
4import pickle
5from typing import Optional, Union, Any
6from pathlib import Path
7import sys
8import shutil
9import hashlib
10from Bio.PDB import Superimposer, PDBParser # type: ignore
11import codecs
12from biobb_common.configuration import settings
13from biobb_common.tools import file_utils as fu
14import numpy as np
15import json
16import jsonschema
19def test_setup(test_object, dict_key: Optional[str] = None, config: Optional[str] = None):
20 """Add the unitest_dir, test_dir, conf_file_path, properties and path as
21 attributes to the **test_object** and create a directory to launch the unitest.
23 Args:
24 test_object (:obj:`test`): The test object.
25 dict_key (str): Key of the test parameters in the yaml config file.
26 config (str): Path to the configuration file.
27 """
28 test_object.testfile_dir = str(Path(Path(str(sys.modules[test_object.__module__].__file__)).resolve()).parent)
29 test_object.unitest_dir = str(Path(test_object.testfile_dir).parent)
30 test_object.test_dir = str(Path(test_object.unitest_dir).parent)
31 test_object.data_dir = str(Path(test_object.test_dir).joinpath('data'))
32 test_object.reference_dir = str(Path(test_object.test_dir).joinpath('reference'))
33 if config:
34 test_object.conf_file_path = config
35 else:
36 test_object.conf_file_path = str(Path(test_object.test_dir).joinpath('conf.yml'))
38 conf = settings.ConfReader(test_object.conf_file_path)
40 if dict_key:
41 test_object.properties = conf.get_prop_dic()[dict_key]
42 test_object.paths = {k: v.replace('test_data_dir', test_object.data_dir, 1).replace('test_reference_dir', test_object.reference_dir, 1) for k, v in conf.get_paths_dic()[dict_key].items()}
43 else:
44 test_object.properties = conf.get_prop_dic()
45 test_object.paths = {k: v.replace('test_data_dir', test_object.data_dir, 1).replace('test_reference_dir', test_object.reference_dir, 1) for k, v in conf.get_paths_dic().items()}
47 fu.create_dir(test_object.properties['path'])
48 os.chdir(test_object.properties['path'])
51def test_teardown(test_object):
52 """Remove the **test_object.properties['working_dir_path']**
54 Args:
55 test_object (:obj:`test`): The test object.
56 """
57 if test_object.properties.get('remove_tmp', True):
58 unitests_path = Path(test_object.properties['path']).resolve().parent
59 print(f"\nRemoving: {unitests_path}")
60 shutil.rmtree(unitests_path)
63def exe_success(return_code: int) -> bool:
64 """Check if **return_code** is 0
66 Args:
67 return_code (int): Return code of a process.
69 Returns:
70 bool: True if return code is equal to 0
71 """
72 return return_code == 0
75def not_empty(file_path: str) -> bool:
76 """Check if file exists and is not empty.
78 Args:
79 file_path (str): Path to the file.
81 Returns:
82 bool: True if **file_path** exists and is not empty.
83 """
84 if file_path.endswith('.zip'):
85 print("Checking if empty zip: "+file_path)
86 dst = file_path[:-4] + '_unzipped'
87 os.makedirs(dst, exist_ok=True)
88 # Extract zip and get list of files
89 unzipped_files = fu.unzip_list(file_path, dest_dir=dst)
90 # Check if there are any files in the zip
91 return len(unzipped_files) > 0
92 elif Path(file_path).is_dir():
93 print("Checking if empty dir: "+file_path)
94 return len(os.listdir(file_path)) > 0
96 print("Checking if empty file: "+file_path)
97 return Path(file_path).is_file() and Path(file_path).stat().st_size > 0
100def compare_hash(file_a: str, file_b: str) -> bool:
101 """Compute and compare the hashes of two files"""
102 print("Comparing: ")
103 print(" File_A: "+file_a)
104 print(" File_B: "+file_b)
105 file_a_hash = hashlib.sha256(open(file_a, 'rb').read()).digest()
106 file_b_hash = hashlib.sha256(open(file_b, 'rb').read()).digest()
107 print(" File_A hash: "+str(file_a_hash))
108 print(" File_B hash: "+str(file_b_hash))
109 return file_a_hash == file_b_hash
112def equal(file_a: str, file_b: str, ignore_list: Optional[list[Union[str, int]]] = None, **kwargs) -> bool:
113 """Check if two files are equal"""
114 if ignore_list:
115 # Line by line comparison
116 return compare_line_by_line(file_a, file_b, ignore_list)
118 if file_a.endswith(".zip") and file_b.endswith(".zip"):
119 return compare_zip(file_a, file_b)
121 if file_a.endswith(".pdb") and file_b.endswith(".pdb"):
122 return compare_pdb(file_a, file_b, **kwargs)
124 if file_a.endswith(".top") and file_b.endswith(".top"):
125 return compare_top_itp(file_a, file_b)
127 if file_a.endswith(".itp") and file_b.endswith(".itp"):
128 return compare_top_itp(file_a, file_b)
130 if file_a.endswith(".gro") and file_b.endswith(".gro"):
131 return compare_ignore_first(file_a, file_b)
133 if file_a.endswith(".prmtop") and file_b.endswith(".prmtop"):
134 return compare_ignore_first(file_a, file_b)
136 if file_a.endswith(".inp") and file_b.endswith(".inp"):
137 return compare_ignore_first(file_a, file_b)
139 if file_a.endswith(".par") and file_b.endswith(".par"):
140 return compare_ignore_first(file_a, file_b)
142 if file_a.endswith((".nc", ".netcdf", ".xtc")) and file_b.endswith((".nc", ".netcdf", ".xtc")):
143 return compare_size(file_a, file_b, kwargs.get('percent_tolerance', 1.0))
145 if file_a.endswith(".xvg") and file_b.endswith(".xvg"):
146 return compare_xvg(file_a, file_b, kwargs.get('percent_tolerance', 1.0))
148 image_extensions = ('.png', '.jfif', '.ppm', '.tiff', '.jpg', '.dib', '.pgm', '.bmp', '.jpeg', '.pbm', '.jpe', '.apng', '.pnm', '.gif', '.tif')
149 if file_a.endswith(image_extensions) and file_b.endswith(image_extensions):
150 return compare_images(file_a, file_b, kwargs.get('percent_tolerance', 1.0))
152 return compare_hash(file_a, file_b)
155def compare_line_by_line(file_a: str, file_b: str, ignore_list: list[Union[str, int]]) -> bool:
156 print(f"Comparing ignoring lines containing this words: {ignore_list}")
157 print(" FILE_A: "+file_a)
158 print(" FILE_B: "+file_b)
159 with open(file_a) as fa, open(file_b) as fb:
160 for index, (line_a, line_b) in enumerate(zip(fa, fb)):
161 if index in ignore_list or any(word in line_a for word in ignore_list if isinstance(word, str)):
162 continue
163 elif line_a != line_b:
164 return False
165 return True
168def equal_txt(file_a: str, file_b: str) -> bool:
169 """Check if two text files are equal"""
170 return compare_hash(file_a, file_b)
173def compare_zip(zip_a: str, zip_b: str) -> bool:
174 """ Compare zip files """
175 print("This is a ZIP comparison!")
176 print("Unzipping:")
177 print("Creating a unique_dir for: %s" % zip_a)
178 zip_a_dir = fu.create_unique_dir()
179 zip_a_list = fu.unzip_list(zip_a, dest_dir=zip_a_dir)
180 print("Creating a unique_dir for: %s" % zip_b)
181 zip_b_dir = fu.create_unique_dir()
182 zip_b_list = fu.unzip_list(zip_b, dest_dir=zip_b_dir)
184 if not len(zip_a_list) == len(zip_b_list):
185 return False
187 for uncompressed_zip_a in zip_a_list:
188 uncompressed_zip_b = str(Path(zip_b_dir).joinpath(Path(uncompressed_zip_a).name))
189 if not equal(uncompressed_zip_a, uncompressed_zip_b):
190 return False
192 return True
195def compare_pdb(pdb_a: str, pdb_b: str, rmsd_cutoff: int = 1, remove_hetatm: bool = True, remove_hydrogen: bool = True, **kwargs):
196 """ Compare pdb files """
197 print("Checking RMSD between:")
198 print(" PDB_A: "+pdb_a)
199 print(" PDB_B: "+pdb_b)
200 pdb_parser = PDBParser(PERMISSIVE=True, QUIET=True)
201 st_a = pdb_parser.get_structure("st_a", pdb_a)
202 st_b = pdb_parser.get_structure("st_b", pdb_b)
203 if st_a is None or st_b is None:
204 print(" One of the PDB structures could not be parsed.")
205 return False
206 st_a = st_a[0]
207 st_b = st_b[0]
209 if remove_hetatm:
210 print(" Ignoring HETAMT in RMSD")
211 residues_a = [list(res.get_atoms()) for res in st_a.get_residues() if not res.id[0].startswith('H_')]
212 residues_b = [list(res.get_atoms()) for res in st_b.get_residues() if not res.id[0].startswith('H_')]
213 atoms_a = [atom for residue in residues_a for atom in residue]
214 atoms_b = [atom for residue in residues_b for atom in residue]
215 else:
216 atoms_a = st_a.get_atoms()
217 atoms_b = st_b.get_atoms()
219 if remove_hydrogen:
220 print(" Ignoring Hydrogen atoms in RMSD")
221 atoms_a = [atom for atom in atoms_a if not atom.get_name().startswith('H')]
222 atoms_b = [atom for atom in atoms_b if not atom.get_name().startswith('H')]
224 atoms_a_list = list(atoms_a)
225 atoms_b_list = list(atoms_b)
226 print(" Atoms ALIGNED in PDB_A: "+str(len(atoms_a_list)))
227 print(" Atoms ALIGNED in PDB_B: "+str(len(atoms_b_list)))
228 super_imposer = Superimposer()
229 super_imposer.set_atoms(atoms_a, atoms_b)
230 super_imposer.apply(atoms_b)
231 super_imposer_rms = super_imposer.rms if super_imposer.rms is not None else float('inf')
232 print(' RMS: '+str(super_imposer_rms))
233 print(' RMS_CUTOFF: '+str(rmsd_cutoff))
234 return super_imposer_rms < rmsd_cutoff
237def compare_top_itp(file_a: str, file_b: str) -> bool:
238 """ Compare top/itp files """
239 print("Comparing TOP/ITP:")
240 print(" FILE_A: "+file_a)
241 print(" FILE_B: "+file_b)
242 with codecs.open(file_a, 'r', encoding='utf-8', errors='ignore') as f_a:
243 next(f_a)
244 with codecs.open(file_b, 'r', encoding='utf-8', errors='ignore') as f_b:
245 next(f_b)
246 return [line.strip() for line in f_a if not line.strip().startswith(';')] == [line.strip() for line in f_b if not line.strip().startswith(';')]
249def compare_ignore_first(file_a: str, file_b: str) -> bool:
250 """ Compare two files ignoring the first line """
251 print("Comparing ignoring first line of both files:")
252 print(" FILE_A: "+file_a)
253 print(" FILE_B: "+file_b)
254 with open(file_a) as f_a:
255 next(f_a)
256 with open(file_b) as f_b:
257 next(f_b)
258 return [line.strip() for line in f_a] == [line.strip() for line in f_b]
261def compare_size(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool:
262 """ Compare two files using size """
263 print("Comparing size of both files:")
264 print(f" FILE_A: {file_a}")
265 print(f" FILE_B: {file_b}")
266 size_a = Path(file_a).stat().st_size
267 size_b = Path(file_b).stat().st_size
268 average_size = (size_a + size_b) / 2
269 tolerance = average_size * percent_tolerance / 100
270 tolerance_low = average_size - tolerance
271 tolerance_high = average_size + tolerance
272 print(f" SIZE_A: {size_a} bytes")
273 print(f" SIZE_B: {size_b} bytes")
274 print(f" TOLERANCE: {percent_tolerance}%, Low: {tolerance_low} bytes, High: {tolerance_high} bytes")
275 return (tolerance_low <= size_a <= tolerance_high) and (tolerance_low <= size_b <= tolerance_high)
278def compare_xvg(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool:
279 """ Compare two files using size """
280 print("Comparing size of both files:")
281 print(f" FILE_A: {file_a}")
282 print(f" FILE_B: {file_b}")
283 arrays_tuple_a = np.loadtxt(file_a, comments=["@", '#'], unpack=True)
284 arrays_tuple_b = np.loadtxt(file_b, comments=["@", '#'], unpack=True)
285 for array_a, array_b in zip(arrays_tuple_a, arrays_tuple_b):
286 if not np.allclose(array_a, array_b, rtol=percent_tolerance / 100):
287 return False
288 return True
291def compare_images(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool:
292 try:
293 from PIL import Image # type: ignore
294 import imagehash
295 except ImportError:
296 print("To compare images, please install the following packages: Pillow, imagehash")
297 return False
299 """ Compare two files using size """
300 print("Comparing images of both files:")
301 print(f" IMAGE_A: {file_a}")
302 print(f" IMAGE_B: {file_b}")
303 hash_a = imagehash.average_hash(Image.open(file_a))
304 hash_b = imagehash.average_hash(Image.open(file_b))
305 tolerance = (len(hash_a) + len(hash_b)) / 2 * percent_tolerance / 100
306 if tolerance < 1:
307 tolerance = 1
308 difference = hash_a - hash_b
309 print(f" IMAGE_A HASH: {hash_a} SIZE: {len(hash_a)} bits")
310 print(f" IMAGE_B HASH: {hash_b} SIZE: {len(hash_b)} bits")
311 print(f" TOLERANCE: {percent_tolerance}%, ABS TOLERANCE: {tolerance} bits, DIFFERENCE: {difference} bits")
312 if difference > tolerance:
313 return False
314 return True
317def compare_object_pickle(python_object: Any, pickle_file_path: Union[str, Path], **kwargs) -> bool:
318 """ Compare a python object with a pickle file """
319 print(f"Loading pickle file: {pickle_file_path}")
320 with open(pickle_file_path, 'rb') as f:
321 pickle_object = pickle.load(f)
323 # Special case for dictionaries
324 if isinstance(python_object, dict) and isinstance(pickle_object, dict):
325 differences = compare_dictionaries(python_object, pickle_object, ignore_keys=kwargs.get('ignore_keys', []), compare_values=kwargs.get('compare_values', True), ignore_substring=kwargs.get('ignore_substring', ""))
326 if differences:
327 print(50*'*')
328 print("OBJECT:")
329 print(python_object)
330 print(50*'*')
331 print()
332 print(50*'*')
333 print("EXPECTED OBJECT:")
334 print(pickle_object)
335 print(50*'*')
337 print("Differences found:")
338 for difference in differences:
339 print(f" {difference}")
340 return False
341 return True
343 return python_object == pickle_object
346def compare_dictionaries(dict1: dict, dict2: dict, path: str = "", ignore_keys: Optional[list[str]] = None, compare_values: bool = True, ignore_substring: str = "") -> list[str]:
347 """Compare two dictionaries and print only the differences, ignoring specified keys."""
348 if ignore_keys is None:
349 ignore_keys = []
351 differences = []
353 # Get all keys from both dictionaries
354 all_keys = set(dict1.keys()).union(set(dict2.keys()))
356 for key in all_keys:
357 if key in ignore_keys:
358 continue
359 if key not in dict1:
360 differences.append(f"Key '{path + key}' found in dict2 but not in dict1")
361 elif key not in dict2:
362 differences.append(f"Key '{path + key}' found in dict1 but not in dict2")
363 else:
364 value1 = dict1[key]
365 value2 = dict2[key]
366 if isinstance(value1, dict) and isinstance(value2, dict):
367 # Recursively compare nested dictionaries
368 nested_differences = compare_dictionaries(value1, value2, path + key + ".", ignore_keys, compare_values, ignore_substring)
369 differences.extend(nested_differences)
370 elif (value1 != value2) and compare_values:
371 if ignore_substring:
372 if (not str(value1).endswith(str(value2).replace(ignore_substring, ""))) and (not str(value2).endswith(str(value1).replace(ignore_substring, ""))):
373 differences.append(f"Difference at '{path + key}': dict1 has {value1}, dict2 has {value2}")
375 else:
376 differences.append(f"Difference at '{path + key}': dict1 has {value1}, dict2 has {value2}")
378 return differences
381def validate_json(json_file_path: Union[str, Path], json_schema_path: Union[str, Path]) -> bool:
382 """
383 Validates a JSON file against a provided JSON schema.
385 Args:
386 json_file_path (str): Path to the JSON file to validate.
387 json_schema_path (str): Path to the JSON schema file.
389 Returns:
390 bool: True if the JSON is valid, False if invalid.
391 """
392 print("Validating JSON file:")
393 print(f" JSON file: {json_file_path}")
394 print(f" JSON schema: {json_schema_path}")
395 try:
396 # Load the JSON file
397 with open(json_file_path, 'r') as json_file:
398 json_data = json.load(json_file)
400 # Load the JSON schema
401 with open(json_schema_path, 'r') as schema_file:
402 schema = json.load(schema_file)
404 # Validate the JSON data against the schema
405 jsonschema.validate(instance=json_data, schema=schema)
407 return True
408 except jsonschema.ValidationError as ve:
409 print(f"Validation error: {ve.message}")
410 return False
411 except json.JSONDecodeError as je:
412 print(f"Invalid JSON format: {je.msg}")
413 return False