Coverage for biobb_common/biobb_common/tools/test_fixtures.py: 38%

266 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 11:32 +0000

1"""Boiler plate functions for testsys 

2""" 

3import os 

4import pickle 

5from typing import Optional, Union, Any 

6from pathlib import Path 

7import sys 

8import shutil 

9import hashlib 

10from Bio.PDB import Superimposer, PDBParser # type: ignore 

11import codecs 

12from biobb_common.configuration import settings 

13from biobb_common.tools import file_utils as fu 

14import numpy as np 

15import json 

16import jsonschema 

17 

18 

19def test_setup(test_object, dict_key: Optional[str] = None, config: Optional[str] = None): 

20 """Add the unitest_dir, test_dir, conf_file_path, properties and path as 

21 attributes to the **test_object** and create a directory to launch the unitest. 

22 

23 Args: 

24 test_object (:obj:`test`): The test object. 

25 dict_key (str): Key of the test parameters in the yaml config file. 

26 config (str): Path to the configuration file. 

27 """ 

28 test_object.testfile_dir = str(Path(Path(str(sys.modules[test_object.__module__].__file__)).resolve()).parent) 

29 test_object.unitest_dir = str(Path(test_object.testfile_dir).parent) 

30 test_object.test_dir = str(Path(test_object.unitest_dir).parent) 

31 test_object.data_dir = str(Path(test_object.test_dir).joinpath('data')) 

32 test_object.reference_dir = str(Path(test_object.test_dir).joinpath('reference')) 

33 if config: 

34 test_object.conf_file_path = config 

35 else: 

36 test_object.conf_file_path = str(Path(test_object.test_dir).joinpath('conf.yml')) 

37 

38 conf = settings.ConfReader(test_object.conf_file_path) 

39 

40 if dict_key: 

41 test_object.properties = conf.get_prop_dic()[dict_key] 

42 test_object.paths = {k: v.replace('test_data_dir', test_object.data_dir, 1).replace('test_reference_dir', test_object.reference_dir, 1) for k, v in conf.get_paths_dic()[dict_key].items()} 

43 else: 

44 test_object.properties = conf.get_prop_dic() 

45 test_object.paths = {k: v.replace('test_data_dir', test_object.data_dir, 1).replace('test_reference_dir', test_object.reference_dir, 1) for k, v in conf.get_paths_dic().items()} 

46 

47 fu.create_dir(test_object.properties['path']) 

48 os.chdir(test_object.properties['path']) 

49 

50 

51def test_teardown(test_object): 

52 """Remove the **test_object.properties['working_dir_path']** 

53 

54 Args: 

55 test_object (:obj:`test`): The test object. 

56 """ 

57 unitests_path = Path(test_object.properties['path']).resolve().parent 

58 print(f"\nRemoving: {unitests_path}") 

59 shutil.rmtree(unitests_path) 

60 

61 

62def exe_success(return_code: int) -> bool: 

63 """Check if **return_code** is 0 

64 

65 Args: 

66 return_code (int): Return code of a process. 

67 

68 Returns: 

69 bool: True if return code is equal to 0 

70 """ 

71 return return_code == 0 

72 

73 

74def not_empty(file_path: str) -> bool: 

75 """Check if file exists and is not empty. 

76 

77 Args: 

78 file_path (str): Path to the file. 

79 

80 Returns: 

81 bool: True if **file_path** exists and is not empty. 

82 """ 

83 print("Checking if empty file: "+file_path) 

84 return Path(file_path).is_file() and Path(file_path).stat().st_size > 0 

85 

86 

87def compare_hash(file_a: str, file_b: str) -> bool: 

88 """Compute and compare the hashes of two files""" 

89 print("Comparing: ") 

90 print(" File_A: "+file_a) 

91 print(" File_B: "+file_b) 

92 file_a_hash = hashlib.sha256(open(file_a, 'rb').read()).digest() 

93 file_b_hash = hashlib.sha256(open(file_b, 'rb').read()).digest() 

94 print(" File_A hash: "+str(file_a_hash)) 

95 print(" File_B hash: "+str(file_b_hash)) 

96 return file_a_hash == file_b_hash 

97 

98 

99def equal(file_a: str, file_b: str, ignore_list: Optional[list[Union[str, int]]] = None, **kwargs) -> bool: 

100 """Check if two files are equal""" 

101 if ignore_list: 

102 # Line by line comparison 

103 return compare_line_by_line(file_a, file_b, ignore_list) 

104 

105 if file_a.endswith(".zip") and file_b.endswith(".zip"): 

106 return compare_zip(file_a, file_b) 

107 

108 if file_a.endswith(".pdb") and file_b.endswith(".pdb"): 

109 return compare_pdb(file_a, file_b, **kwargs) 

110 

111 if file_a.endswith(".top") and file_b.endswith(".top"): 

112 return compare_top_itp(file_a, file_b) 

113 

114 if file_a.endswith(".itp") and file_b.endswith(".itp"): 

115 return compare_top_itp(file_a, file_b) 

116 

117 if file_a.endswith(".gro") and file_b.endswith(".gro"): 

118 return compare_ignore_first(file_a, file_b) 

119 

120 if file_a.endswith(".prmtop") and file_b.endswith(".prmtop"): 

121 return compare_ignore_first(file_a, file_b) 

122 

123 if file_a.endswith(".inp") and file_b.endswith(".inp"): 

124 return compare_ignore_first(file_a, file_b) 

125 

126 if file_a.endswith(".par") and file_b.endswith(".par"): 

127 return compare_ignore_first(file_a, file_b) 

128 

129 if file_a.endswith((".nc", ".netcdf", ".xtc")) and file_b.endswith((".nc", ".netcdf", ".xtc")): 

130 return compare_size(file_a, file_b, kwargs.get('percent_tolerance', 1.0)) 

131 

132 if file_a.endswith(".xvg") and file_b.endswith(".xvg"): 

133 return compare_xvg(file_a, file_b, kwargs.get('percent_tolerance', 1.0)) 

134 

135 image_extensions = ('.png', '.jfif', '.ppm', '.tiff', '.jpg', '.dib', '.pgm', '.bmp', '.jpeg', '.pbm', '.jpe', '.apng', '.pnm', '.gif', '.tif') 

136 if file_a.endswith(image_extensions) and file_b.endswith(image_extensions): 

137 return compare_images(file_a, file_b, kwargs.get('percent_tolerance', 1.0)) 

138 

139 return compare_hash(file_a, file_b) 

140 

141 

142def compare_line_by_line(file_a: str, file_b: str, ignore_list: list[Union[str, int]]) -> bool: 

143 print(f"Comparing ignoring lines containing this words: {ignore_list}") 

144 print(" FILE_A: "+file_a) 

145 print(" FILE_B: "+file_b) 

146 with open(file_a) as fa, open(file_b) as fb: 

147 for index, (line_a, line_b) in enumerate(zip(fa, fb)): 

148 if index in ignore_list or any(word in line_a for word in ignore_list if isinstance(word, str)): 

149 continue 

150 elif line_a != line_b: 

151 return False 

152 return True 

153 

154 

155def equal_txt(file_a: str, file_b: str) -> bool: 

156 """Check if two text files are equal""" 

157 return compare_hash(file_a, file_b) 

158 

159 

160def compare_zip(zip_a: str, zip_b: str) -> bool: 

161 """ Compare zip files """ 

162 print("This is a ZIP comparison!") 

163 print("Unzipping:") 

164 print("Creating a unique_dir for: %s" % zip_a) 

165 zip_a_dir = fu.create_unique_dir() 

166 zip_a_list = fu.unzip_list(zip_a, dest_dir=zip_a_dir) 

167 print("Creating a unique_dir for: %s" % zip_b) 

168 zip_b_dir = fu.create_unique_dir() 

169 zip_b_list = fu.unzip_list(zip_b, dest_dir=zip_b_dir) 

170 

171 if not len(zip_a_list) == len(zip_b_list): 

172 return False 

173 

174 for uncompressed_zip_a in zip_a_list: 

175 uncompressed_zip_b = str(Path(zip_b_dir).joinpath(Path(uncompressed_zip_a).name)) 

176 if not equal(uncompressed_zip_a, uncompressed_zip_b): 

177 return False 

178 

179 return True 

180 

181 

182def compare_pdb(pdb_a: str, pdb_b: str, rmsd_cutoff: int = 1, remove_hetatm: bool = True, remove_hydrogen: bool = True, **kwargs): 

183 """ Compare pdb files """ 

184 print("Checking RMSD between:") 

185 print(" PDB_A: "+pdb_a) 

186 print(" PDB_B: "+pdb_b) 

187 pdb_parser = PDBParser(PERMISSIVE=True, QUIET=True) 

188 st_a = pdb_parser.get_structure("st_a", pdb_a) 

189 st_b = pdb_parser.get_structure("st_b", pdb_b) 

190 if st_a is None or st_b is None: 

191 print(" One of the PDB structures could not be parsed.") 

192 return False 

193 st_a = st_a[0] 

194 st_b = st_b[0] 

195 

196 if remove_hetatm: 

197 print(" Ignoring HETAMT in RMSD") 

198 residues_a = [list(res.get_atoms()) for res in st_a.get_residues() if not res.id[0].startswith('H_')] 

199 residues_b = [list(res.get_atoms()) for res in st_b.get_residues() if not res.id[0].startswith('H_')] 

200 atoms_a = [atom for residue in residues_a for atom in residue] 

201 atoms_b = [atom for residue in residues_b for atom in residue] 

202 else: 

203 atoms_a = st_a.get_atoms() 

204 atoms_b = st_b.get_atoms() 

205 

206 if remove_hydrogen: 

207 print(" Ignoring Hydrogen atoms in RMSD") 

208 atoms_a = [atom for atom in atoms_a if not atom.get_name().startswith('H')] 

209 atoms_b = [atom for atom in atoms_b if not atom.get_name().startswith('H')] 

210 

211 atoms_a_list = list(atoms_a) 

212 atoms_b_list = list(atoms_b) 

213 print(" Atoms ALIGNED in PDB_A: "+str(len(atoms_a_list))) 

214 print(" Atoms ALIGNED in PDB_B: "+str(len(atoms_b_list))) 

215 super_imposer = Superimposer() 

216 super_imposer.set_atoms(atoms_a, atoms_b) 

217 super_imposer.apply(atoms_b) 

218 super_imposer_rms = super_imposer.rms if super_imposer.rms is not None else float('inf') 

219 print(' RMS: '+str(super_imposer_rms)) 

220 print(' RMS_CUTOFF: '+str(rmsd_cutoff)) 

221 return super_imposer_rms < rmsd_cutoff 

222 

223 

224def compare_top_itp(file_a: str, file_b: str) -> bool: 

225 """ Compare top/itp files """ 

226 print("Comparing TOP/ITP:") 

227 print(" FILE_A: "+file_a) 

228 print(" FILE_B: "+file_b) 

229 with codecs.open(file_a, 'r', encoding='utf-8', errors='ignore') as f_a: 

230 next(f_a) 

231 with codecs.open(file_b, 'r', encoding='utf-8', errors='ignore') as f_b: 

232 next(f_b) 

233 return [line.strip() for line in f_a if not line.strip().startswith(';')] == [line.strip() for line in f_b if not line.strip().startswith(';')] 

234 

235 

236def compare_ignore_first(file_a: str, file_b: str) -> bool: 

237 """ Compare two files ignoring the first line """ 

238 print("Comparing ignoring first line of both files:") 

239 print(" FILE_A: "+file_a) 

240 print(" FILE_B: "+file_b) 

241 with open(file_a) as f_a: 

242 next(f_a) 

243 with open(file_b) as f_b: 

244 next(f_b) 

245 return [line.strip() for line in f_a] == [line.strip() for line in f_b] 

246 

247 

248def compare_size(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool: 

249 """ Compare two files using size """ 

250 print("Comparing size of both files:") 

251 print(f" FILE_A: {file_a}") 

252 print(f" FILE_B: {file_b}") 

253 size_a = Path(file_a).stat().st_size 

254 size_b = Path(file_b).stat().st_size 

255 average_size = (size_a + size_b) / 2 

256 tolerance = average_size * percent_tolerance / 100 

257 tolerance_low = average_size - tolerance 

258 tolerance_high = average_size + tolerance 

259 print(f" SIZE_A: {size_a} bytes") 

260 print(f" SIZE_B: {size_b} bytes") 

261 print(f" TOLERANCE: {percent_tolerance}%, Low: {tolerance_low} bytes, High: {tolerance_high} bytes") 

262 return (tolerance_low <= size_a <= tolerance_high) and (tolerance_low <= size_b <= tolerance_high) 

263 

264 

265def compare_xvg(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool: 

266 """ Compare two files using size """ 

267 print("Comparing size of both files:") 

268 print(f" FILE_A: {file_a}") 

269 print(f" FILE_B: {file_b}") 

270 arrays_tuple_a = np.loadtxt(file_a, comments="@", unpack=True) 

271 arrays_tuple_b = np.loadtxt(file_b, comments="@", unpack=True) 

272 for array_a, array_b in zip(arrays_tuple_a, arrays_tuple_b): 

273 if not np.allclose(array_a, array_b, rtol=percent_tolerance / 100): 

274 return False 

275 return True 

276 

277 

278def compare_images(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool: 

279 try: 

280 from PIL import Image # type: ignore 

281 import imagehash 

282 except ImportError: 

283 print("To compare images, please install the following packages: Pillow, imagehash") 

284 return False 

285 

286 """ Compare two files using size """ 

287 print("Comparing images of both files:") 

288 print(f" IMAGE_A: {file_a}") 

289 print(f" IMAGE_B: {file_b}") 

290 hash_a = imagehash.average_hash(Image.open(file_a)) 

291 hash_b = imagehash.average_hash(Image.open(file_b)) 

292 tolerance = (len(hash_a) + len(hash_b)) / 2 * percent_tolerance / 100 

293 if tolerance < 1: 

294 tolerance = 1 

295 difference = hash_a - hash_b 

296 print(f" IMAGE_A HASH: {hash_a} SIZE: {len(hash_a)} bits") 

297 print(f" IMAGE_B HASH: {hash_b} SIZE: {len(hash_b)} bits") 

298 print(f" TOLERANCE: {percent_tolerance}%, ABS TOLERANCE: {tolerance} bits, DIFFERENCE: {difference} bits") 

299 if difference > tolerance: 

300 return False 

301 return True 

302 

303 

304def compare_object_pickle(python_object: Any, pickle_file_path: Union[str, Path], **kwargs) -> bool: 

305 """ Compare a python object with a pickle file """ 

306 print(f"Loading pickle file: {pickle_file_path}") 

307 with open(pickle_file_path, 'rb') as f: 

308 pickle_object = pickle.load(f) 

309 

310 # Special case for dictionaries 

311 if isinstance(python_object, dict) and isinstance(pickle_object, dict): 

312 differences = compare_dictionaries(python_object, pickle_object, ignore_keys=kwargs.get('ignore_keys', []), compare_values=kwargs.get('compare_values', True), ignore_substring=kwargs.get('ignore_substring', "")) 

313 if differences: 

314 print(50*'*') 

315 print("OBJECT:") 

316 print(python_object) 

317 print(50*'*') 

318 print() 

319 print(50*'*') 

320 print("EXPECTED OBJECT:") 

321 print(pickle_object) 

322 print(50*'*') 

323 

324 print("Differences found:") 

325 for difference in differences: 

326 print(f" {difference}") 

327 return False 

328 return True 

329 

330 return python_object == pickle_object 

331 

332 

333def compare_dictionaries(dict1: dict, dict2: dict, path: str = "", ignore_keys: Optional[list[str]] = None, compare_values: bool = True, ignore_substring: str = "") -> list[str]: 

334 """Compare two dictionaries and print only the differences, ignoring specified keys.""" 

335 if ignore_keys is None: 

336 ignore_keys = [] 

337 

338 differences = [] 

339 

340 # Get all keys from both dictionaries 

341 all_keys = set(dict1.keys()).union(set(dict2.keys())) 

342 

343 for key in all_keys: 

344 if key in ignore_keys: 

345 continue 

346 if key not in dict1: 

347 differences.append(f"Key '{path + key}' found in dict2 but not in dict1") 

348 elif key not in dict2: 

349 differences.append(f"Key '{path + key}' found in dict1 but not in dict2") 

350 else: 

351 value1 = dict1[key] 

352 value2 = dict2[key] 

353 if isinstance(value1, dict) and isinstance(value2, dict): 

354 # Recursively compare nested dictionaries 

355 nested_differences = compare_dictionaries(value1, value2, path + key + ".", ignore_keys, compare_values, ignore_substring) 

356 differences.extend(nested_differences) 

357 elif (value1 != value2) and compare_values: 

358 if ignore_substring: 

359 if (not str(value1).endswith(str(value2).replace(ignore_substring, ""))) and (not str(value2).endswith(str(value1).replace(ignore_substring, ""))): 

360 differences.append(f"Difference at '{path + key}': dict1 has {value1}, dict2 has {value2}") 

361 

362 else: 

363 differences.append(f"Difference at '{path + key}': dict1 has {value1}, dict2 has {value2}") 

364 

365 return differences 

366 

367 

368def validate_json(json_file_path: Union[str, Path], json_schema_path: Union[str, Path]) -> bool: 

369 """ 

370 Validates a JSON file against a provided JSON schema. 

371 

372 Args: 

373 json_file_path (str): Path to the JSON file to validate. 

374 json_schema_path (str): Path to the JSON schema file. 

375 

376 Returns: 

377 bool: True if the JSON is valid, False if invalid. 

378 """ 

379 print("Validating JSON file:") 

380 print(f" JSON file: {json_file_path}") 

381 print(f" JSON schema: {json_schema_path}") 

382 try: 

383 # Load the JSON file 

384 with open(json_file_path, 'r') as json_file: 

385 json_data = json.load(json_file) 

386 

387 # Load the JSON schema 

388 with open(json_schema_path, 'r') as schema_file: 

389 schema = json.load(schema_file) 

390 

391 # Validate the JSON data against the schema 

392 jsonschema.validate(instance=json_data, schema=schema) 

393 

394 return True 

395 except jsonschema.ValidationError as ve: 

396 print(f"Validation error: {ve.message}") 

397 return False 

398 except json.JSONDecodeError as je: 

399 print(f"Invalid JSON format: {je.msg}") 

400 return False