Coverage for biobb_common/biobb_common/tools/test_fixtures.py: 38%
266 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 11:32 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 11:32 +0000
1"""Boiler plate functions for testsys
2"""
3import os
4import pickle
5from typing import Optional, Union, Any
6from pathlib import Path
7import sys
8import shutil
9import hashlib
10from Bio.PDB import Superimposer, PDBParser # type: ignore
11import codecs
12from biobb_common.configuration import settings
13from biobb_common.tools import file_utils as fu
14import numpy as np
15import json
16import jsonschema
19def test_setup(test_object, dict_key: Optional[str] = None, config: Optional[str] = None):
20 """Add the unitest_dir, test_dir, conf_file_path, properties and path as
21 attributes to the **test_object** and create a directory to launch the unitest.
23 Args:
24 test_object (:obj:`test`): The test object.
25 dict_key (str): Key of the test parameters in the yaml config file.
26 config (str): Path to the configuration file.
27 """
28 test_object.testfile_dir = str(Path(Path(str(sys.modules[test_object.__module__].__file__)).resolve()).parent)
29 test_object.unitest_dir = str(Path(test_object.testfile_dir).parent)
30 test_object.test_dir = str(Path(test_object.unitest_dir).parent)
31 test_object.data_dir = str(Path(test_object.test_dir).joinpath('data'))
32 test_object.reference_dir = str(Path(test_object.test_dir).joinpath('reference'))
33 if config:
34 test_object.conf_file_path = config
35 else:
36 test_object.conf_file_path = str(Path(test_object.test_dir).joinpath('conf.yml'))
38 conf = settings.ConfReader(test_object.conf_file_path)
40 if dict_key:
41 test_object.properties = conf.get_prop_dic()[dict_key]
42 test_object.paths = {k: v.replace('test_data_dir', test_object.data_dir, 1).replace('test_reference_dir', test_object.reference_dir, 1) for k, v in conf.get_paths_dic()[dict_key].items()}
43 else:
44 test_object.properties = conf.get_prop_dic()
45 test_object.paths = {k: v.replace('test_data_dir', test_object.data_dir, 1).replace('test_reference_dir', test_object.reference_dir, 1) for k, v in conf.get_paths_dic().items()}
47 fu.create_dir(test_object.properties['path'])
48 os.chdir(test_object.properties['path'])
51def test_teardown(test_object):
52 """Remove the **test_object.properties['working_dir_path']**
54 Args:
55 test_object (:obj:`test`): The test object.
56 """
57 unitests_path = Path(test_object.properties['path']).resolve().parent
58 print(f"\nRemoving: {unitests_path}")
59 shutil.rmtree(unitests_path)
62def exe_success(return_code: int) -> bool:
63 """Check if **return_code** is 0
65 Args:
66 return_code (int): Return code of a process.
68 Returns:
69 bool: True if return code is equal to 0
70 """
71 return return_code == 0
74def not_empty(file_path: str) -> bool:
75 """Check if file exists and is not empty.
77 Args:
78 file_path (str): Path to the file.
80 Returns:
81 bool: True if **file_path** exists and is not empty.
82 """
83 print("Checking if empty file: "+file_path)
84 return Path(file_path).is_file() and Path(file_path).stat().st_size > 0
87def compare_hash(file_a: str, file_b: str) -> bool:
88 """Compute and compare the hashes of two files"""
89 print("Comparing: ")
90 print(" File_A: "+file_a)
91 print(" File_B: "+file_b)
92 file_a_hash = hashlib.sha256(open(file_a, 'rb').read()).digest()
93 file_b_hash = hashlib.sha256(open(file_b, 'rb').read()).digest()
94 print(" File_A hash: "+str(file_a_hash))
95 print(" File_B hash: "+str(file_b_hash))
96 return file_a_hash == file_b_hash
99def equal(file_a: str, file_b: str, ignore_list: Optional[list[Union[str, int]]] = None, **kwargs) -> bool:
100 """Check if two files are equal"""
101 if ignore_list:
102 # Line by line comparison
103 return compare_line_by_line(file_a, file_b, ignore_list)
105 if file_a.endswith(".zip") and file_b.endswith(".zip"):
106 return compare_zip(file_a, file_b)
108 if file_a.endswith(".pdb") and file_b.endswith(".pdb"):
109 return compare_pdb(file_a, file_b, **kwargs)
111 if file_a.endswith(".top") and file_b.endswith(".top"):
112 return compare_top_itp(file_a, file_b)
114 if file_a.endswith(".itp") and file_b.endswith(".itp"):
115 return compare_top_itp(file_a, file_b)
117 if file_a.endswith(".gro") and file_b.endswith(".gro"):
118 return compare_ignore_first(file_a, file_b)
120 if file_a.endswith(".prmtop") and file_b.endswith(".prmtop"):
121 return compare_ignore_first(file_a, file_b)
123 if file_a.endswith(".inp") and file_b.endswith(".inp"):
124 return compare_ignore_first(file_a, file_b)
126 if file_a.endswith(".par") and file_b.endswith(".par"):
127 return compare_ignore_first(file_a, file_b)
129 if file_a.endswith((".nc", ".netcdf", ".xtc")) and file_b.endswith((".nc", ".netcdf", ".xtc")):
130 return compare_size(file_a, file_b, kwargs.get('percent_tolerance', 1.0))
132 if file_a.endswith(".xvg") and file_b.endswith(".xvg"):
133 return compare_xvg(file_a, file_b, kwargs.get('percent_tolerance', 1.0))
135 image_extensions = ('.png', '.jfif', '.ppm', '.tiff', '.jpg', '.dib', '.pgm', '.bmp', '.jpeg', '.pbm', '.jpe', '.apng', '.pnm', '.gif', '.tif')
136 if file_a.endswith(image_extensions) and file_b.endswith(image_extensions):
137 return compare_images(file_a, file_b, kwargs.get('percent_tolerance', 1.0))
139 return compare_hash(file_a, file_b)
142def compare_line_by_line(file_a: str, file_b: str, ignore_list: list[Union[str, int]]) -> bool:
143 print(f"Comparing ignoring lines containing this words: {ignore_list}")
144 print(" FILE_A: "+file_a)
145 print(" FILE_B: "+file_b)
146 with open(file_a) as fa, open(file_b) as fb:
147 for index, (line_a, line_b) in enumerate(zip(fa, fb)):
148 if index in ignore_list or any(word in line_a for word in ignore_list if isinstance(word, str)):
149 continue
150 elif line_a != line_b:
151 return False
152 return True
155def equal_txt(file_a: str, file_b: str) -> bool:
156 """Check if two text files are equal"""
157 return compare_hash(file_a, file_b)
160def compare_zip(zip_a: str, zip_b: str) -> bool:
161 """ Compare zip files """
162 print("This is a ZIP comparison!")
163 print("Unzipping:")
164 print("Creating a unique_dir for: %s" % zip_a)
165 zip_a_dir = fu.create_unique_dir()
166 zip_a_list = fu.unzip_list(zip_a, dest_dir=zip_a_dir)
167 print("Creating a unique_dir for: %s" % zip_b)
168 zip_b_dir = fu.create_unique_dir()
169 zip_b_list = fu.unzip_list(zip_b, dest_dir=zip_b_dir)
171 if not len(zip_a_list) == len(zip_b_list):
172 return False
174 for uncompressed_zip_a in zip_a_list:
175 uncompressed_zip_b = str(Path(zip_b_dir).joinpath(Path(uncompressed_zip_a).name))
176 if not equal(uncompressed_zip_a, uncompressed_zip_b):
177 return False
179 return True
182def compare_pdb(pdb_a: str, pdb_b: str, rmsd_cutoff: int = 1, remove_hetatm: bool = True, remove_hydrogen: bool = True, **kwargs):
183 """ Compare pdb files """
184 print("Checking RMSD between:")
185 print(" PDB_A: "+pdb_a)
186 print(" PDB_B: "+pdb_b)
187 pdb_parser = PDBParser(PERMISSIVE=True, QUIET=True)
188 st_a = pdb_parser.get_structure("st_a", pdb_a)
189 st_b = pdb_parser.get_structure("st_b", pdb_b)
190 if st_a is None or st_b is None:
191 print(" One of the PDB structures could not be parsed.")
192 return False
193 st_a = st_a[0]
194 st_b = st_b[0]
196 if remove_hetatm:
197 print(" Ignoring HETAMT in RMSD")
198 residues_a = [list(res.get_atoms()) for res in st_a.get_residues() if not res.id[0].startswith('H_')]
199 residues_b = [list(res.get_atoms()) for res in st_b.get_residues() if not res.id[0].startswith('H_')]
200 atoms_a = [atom for residue in residues_a for atom in residue]
201 atoms_b = [atom for residue in residues_b for atom in residue]
202 else:
203 atoms_a = st_a.get_atoms()
204 atoms_b = st_b.get_atoms()
206 if remove_hydrogen:
207 print(" Ignoring Hydrogen atoms in RMSD")
208 atoms_a = [atom for atom in atoms_a if not atom.get_name().startswith('H')]
209 atoms_b = [atom for atom in atoms_b if not atom.get_name().startswith('H')]
211 atoms_a_list = list(atoms_a)
212 atoms_b_list = list(atoms_b)
213 print(" Atoms ALIGNED in PDB_A: "+str(len(atoms_a_list)))
214 print(" Atoms ALIGNED in PDB_B: "+str(len(atoms_b_list)))
215 super_imposer = Superimposer()
216 super_imposer.set_atoms(atoms_a, atoms_b)
217 super_imposer.apply(atoms_b)
218 super_imposer_rms = super_imposer.rms if super_imposer.rms is not None else float('inf')
219 print(' RMS: '+str(super_imposer_rms))
220 print(' RMS_CUTOFF: '+str(rmsd_cutoff))
221 return super_imposer_rms < rmsd_cutoff
224def compare_top_itp(file_a: str, file_b: str) -> bool:
225 """ Compare top/itp files """
226 print("Comparing TOP/ITP:")
227 print(" FILE_A: "+file_a)
228 print(" FILE_B: "+file_b)
229 with codecs.open(file_a, 'r', encoding='utf-8', errors='ignore') as f_a:
230 next(f_a)
231 with codecs.open(file_b, 'r', encoding='utf-8', errors='ignore') as f_b:
232 next(f_b)
233 return [line.strip() for line in f_a if not line.strip().startswith(';')] == [line.strip() for line in f_b if not line.strip().startswith(';')]
236def compare_ignore_first(file_a: str, file_b: str) -> bool:
237 """ Compare two files ignoring the first line """
238 print("Comparing ignoring first line of both files:")
239 print(" FILE_A: "+file_a)
240 print(" FILE_B: "+file_b)
241 with open(file_a) as f_a:
242 next(f_a)
243 with open(file_b) as f_b:
244 next(f_b)
245 return [line.strip() for line in f_a] == [line.strip() for line in f_b]
248def compare_size(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool:
249 """ Compare two files using size """
250 print("Comparing size of both files:")
251 print(f" FILE_A: {file_a}")
252 print(f" FILE_B: {file_b}")
253 size_a = Path(file_a).stat().st_size
254 size_b = Path(file_b).stat().st_size
255 average_size = (size_a + size_b) / 2
256 tolerance = average_size * percent_tolerance / 100
257 tolerance_low = average_size - tolerance
258 tolerance_high = average_size + tolerance
259 print(f" SIZE_A: {size_a} bytes")
260 print(f" SIZE_B: {size_b} bytes")
261 print(f" TOLERANCE: {percent_tolerance}%, Low: {tolerance_low} bytes, High: {tolerance_high} bytes")
262 return (tolerance_low <= size_a <= tolerance_high) and (tolerance_low <= size_b <= tolerance_high)
265def compare_xvg(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool:
266 """ Compare two files using size """
267 print("Comparing size of both files:")
268 print(f" FILE_A: {file_a}")
269 print(f" FILE_B: {file_b}")
270 arrays_tuple_a = np.loadtxt(file_a, comments="@", unpack=True)
271 arrays_tuple_b = np.loadtxt(file_b, comments="@", unpack=True)
272 for array_a, array_b in zip(arrays_tuple_a, arrays_tuple_b):
273 if not np.allclose(array_a, array_b, rtol=percent_tolerance / 100):
274 return False
275 return True
278def compare_images(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool:
279 try:
280 from PIL import Image # type: ignore
281 import imagehash
282 except ImportError:
283 print("To compare images, please install the following packages: Pillow, imagehash")
284 return False
286 """ Compare two files using size """
287 print("Comparing images of both files:")
288 print(f" IMAGE_A: {file_a}")
289 print(f" IMAGE_B: {file_b}")
290 hash_a = imagehash.average_hash(Image.open(file_a))
291 hash_b = imagehash.average_hash(Image.open(file_b))
292 tolerance = (len(hash_a) + len(hash_b)) / 2 * percent_tolerance / 100
293 if tolerance < 1:
294 tolerance = 1
295 difference = hash_a - hash_b
296 print(f" IMAGE_A HASH: {hash_a} SIZE: {len(hash_a)} bits")
297 print(f" IMAGE_B HASH: {hash_b} SIZE: {len(hash_b)} bits")
298 print(f" TOLERANCE: {percent_tolerance}%, ABS TOLERANCE: {tolerance} bits, DIFFERENCE: {difference} bits")
299 if difference > tolerance:
300 return False
301 return True
304def compare_object_pickle(python_object: Any, pickle_file_path: Union[str, Path], **kwargs) -> bool:
305 """ Compare a python object with a pickle file """
306 print(f"Loading pickle file: {pickle_file_path}")
307 with open(pickle_file_path, 'rb') as f:
308 pickle_object = pickle.load(f)
310 # Special case for dictionaries
311 if isinstance(python_object, dict) and isinstance(pickle_object, dict):
312 differences = compare_dictionaries(python_object, pickle_object, ignore_keys=kwargs.get('ignore_keys', []), compare_values=kwargs.get('compare_values', True), ignore_substring=kwargs.get('ignore_substring', ""))
313 if differences:
314 print(50*'*')
315 print("OBJECT:")
316 print(python_object)
317 print(50*'*')
318 print()
319 print(50*'*')
320 print("EXPECTED OBJECT:")
321 print(pickle_object)
322 print(50*'*')
324 print("Differences found:")
325 for difference in differences:
326 print(f" {difference}")
327 return False
328 return True
330 return python_object == pickle_object
333def compare_dictionaries(dict1: dict, dict2: dict, path: str = "", ignore_keys: Optional[list[str]] = None, compare_values: bool = True, ignore_substring: str = "") -> list[str]:
334 """Compare two dictionaries and print only the differences, ignoring specified keys."""
335 if ignore_keys is None:
336 ignore_keys = []
338 differences = []
340 # Get all keys from both dictionaries
341 all_keys = set(dict1.keys()).union(set(dict2.keys()))
343 for key in all_keys:
344 if key in ignore_keys:
345 continue
346 if key not in dict1:
347 differences.append(f"Key '{path + key}' found in dict2 but not in dict1")
348 elif key not in dict2:
349 differences.append(f"Key '{path + key}' found in dict1 but not in dict2")
350 else:
351 value1 = dict1[key]
352 value2 = dict2[key]
353 if isinstance(value1, dict) and isinstance(value2, dict):
354 # Recursively compare nested dictionaries
355 nested_differences = compare_dictionaries(value1, value2, path + key + ".", ignore_keys, compare_values, ignore_substring)
356 differences.extend(nested_differences)
357 elif (value1 != value2) and compare_values:
358 if ignore_substring:
359 if (not str(value1).endswith(str(value2).replace(ignore_substring, ""))) and (not str(value2).endswith(str(value1).replace(ignore_substring, ""))):
360 differences.append(f"Difference at '{path + key}': dict1 has {value1}, dict2 has {value2}")
362 else:
363 differences.append(f"Difference at '{path + key}': dict1 has {value1}, dict2 has {value2}")
365 return differences
368def validate_json(json_file_path: Union[str, Path], json_schema_path: Union[str, Path]) -> bool:
369 """
370 Validates a JSON file against a provided JSON schema.
372 Args:
373 json_file_path (str): Path to the JSON file to validate.
374 json_schema_path (str): Path to the JSON schema file.
376 Returns:
377 bool: True if the JSON is valid, False if invalid.
378 """
379 print("Validating JSON file:")
380 print(f" JSON file: {json_file_path}")
381 print(f" JSON schema: {json_schema_path}")
382 try:
383 # Load the JSON file
384 with open(json_file_path, 'r') as json_file:
385 json_data = json.load(json_file)
387 # Load the JSON schema
388 with open(json_schema_path, 'r') as schema_file:
389 schema = json.load(schema_file)
391 # Validate the JSON data against the schema
392 jsonschema.validate(instance=json_data, schema=schema)
394 return True
395 except jsonschema.ValidationError as ve:
396 print(f"Validation error: {ve.message}")
397 return False
398 except json.JSONDecodeError as je:
399 print(f"Invalid JSON format: {je.msg}")
400 return False