Coverage for biobb_cmip/cmip/common.py: 44%

372 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 09:52 +0000

1""" Common functions for package biobb_cmip.cmip """ 

2import os 

3import re 

4import json 

5from pathlib import Path 

6from typing import Union, Any, Optional 

7import MDAnalysis as mda # type: ignore 

8from MDAnalysis.topology.guessers import guess_atom_element # type: ignore 

9import uuid 

10import logging 

11import biobb_common.tools.file_utils as fu 

12 

13 

14def get_grid(cmip_log_path: Union[str, Path], external: bool = False) -> tuple[tuple[float, float, float], tuple[float, float, float], dict[str, tuple[float, float, float]]]: 

15 with open(cmip_log_path) as log_file: 

16 first_line = log_file.readline().strip() 

17 if first_line.startswith("titleParam"): 

18 return _get_grid_from_key_value(cmip_log_path, external) 

19 elif first_line.startswith("{"): 

20 return _get_grid_from_box_file(cmip_log_path) 

21 return _get_grid_from_text(cmip_log_path, external) 

22 

23 

24def _get_grid_from_box_file(cmip_box_path: Union[str, Path]) -> tuple[tuple[float, float, float], tuple[float, float, float], dict[str, tuple[float, float, float]]]: 

25 with open(cmip_box_path) as json_file: 

26 grid_dict = json.load(json_file) 

27 origin = grid_dict['origin']['x'], grid_dict['origin']['y'], grid_dict['origin']['z'] 

28 size = grid_dict['size']['x'], grid_dict['size']['y'], grid_dict['size']['z'] 

29 return origin, size, grid_dict['params'] 

30 

31 

32def _get_grid_from_text(cmip_log_path: Union[str, Path], external: bool = False) -> tuple[tuple[float, float, float], tuple[float, float, float], dict[str, tuple[float, float, float]]]: 

33 origin = None 

34 size = None 

35 grid_params: dict[str, Any] = {"CEN": None, "DIM": None, "INT": None} 

36 grid_locators_list = ["AUTOMATIC GRID", "MANUAL GRID"] 

37 if external: 

38 grid_locators_list = ["AUTOMATIC OUTER GRID", "MANUAL OUTER GRID"] 

39 

40 with open(cmip_log_path) as log_file: 

41 inside_automatic_grid = False 

42 for line in log_file: 

43 if line.strip() in grid_locators_list: 

44 inside_automatic_grid = True 

45 if inside_automatic_grid: 

46 origin_match = re.match(r"Grid origin:\s+([-+]?(?:\d*\.\d+|\d+))\s+([-+]?(?:\d*\.\d+|\d+))\s+([-+]?(?:\d*\.\d+|\d+))", line.strip()) 

47 if origin_match: 

48 origin = float(origin_match.group(1)), float(origin_match.group(2)), float(origin_match.group(3)) 

49 size_match = re.match(r"Grid Size:\s+([-+]?(?:\d*\.\d+|\d+))\s+([-+]?(?:\d*\.\d+|\d+))\s+([-+]?(?:\d*\.\d+|\d+))", line.strip()) 

50 if size_match: 

51 size = float(size_match.group(1)), float(size_match.group(2)), float(size_match.group(3)) 

52 int_match = re.match(r"Grid units:\s+([-+]?(?:\d*\.\d+|\d+))\s+([-+]?(?:\d*\.\d+|\d+))\s+([-+]?(?:\d*\.\d+|\d+))", line.strip()) 

53 if int_match: 

54 grid_params['INT'] = float(int_match.group(1)), float(int_match.group(2)), float(int_match.group(3)) 

55 cen_match = re.match(r"Grid center:\s+([-+]?(?:\d*\.\d+|\d+))\s+([-+]?(?:\d*\.\d+|\d+))\s+([-+]?(?:\d*\.\d+|\d+))", line.strip()) 

56 if cen_match: 

57 grid_params['CEN'] = float(cen_match.group(1)), float(cen_match.group(2)), float(cen_match.group(3)) 

58 dim_match = re.match(r"Grid density:\s+([-+]?(?:\d*\.\d+|\d+))\s+([-+]?(?:\d*\.\d+|\d+))\s+([-+]?(?:\d*\.\d+|\d+))", line.strip()) 

59 if dim_match: 

60 grid_params['DIM'] = int(dim_match.group(1)), int(dim_match.group(2)), int(dim_match.group(3)) 

61 if origin and size and grid_params['INT'] and grid_params['CEN'] and grid_params['DIM']: 

62 break 

63 return origin, size, grid_params # type: ignore 

64 

65 

66def _get_grid_from_key_value(cmip_log_path: Union[str, Path], external: bool = False) -> tuple[tuple[float, float, float], tuple[float, float, float], dict[str, tuple[float, float, float]]]: 

67 origin = None 

68 size = None 

69 grid_params: dict[str, Any] = {"CEN": None, "DIM": None, "INT": None} 

70 grid_locators_list = ["AUTOMATIC GRID", "MANUAL GRID"] 

71 if external: 

72 grid_locators_list = ["AUTOMATIC OUTER GRID", "MANUAL OUTER GRID"] 

73 

74 with open(cmip_log_path) as log_file: 

75 inside_automatic_grid = False 

76 for line in log_file: 

77 if line.strip() in grid_locators_list: 

78 inside_automatic_grid = True 

79 if inside_automatic_grid: 

80 origin_match = re.match(r"origin=\s+([-+]?(?:\d*\.\d+|\d+))\s*,\s*([-+]?(?:\d*\.\d+|\d+))\s*,\s*([-+]?(?:\d*\.\d+|\d+))", line.strip()) 

81 if origin_match: 

82 origin = float(origin_match.group(1)), float(origin_match.group(2)), float(origin_match.group(3)) 

83 size_match = re.match(r"size=\s+([-+]?(?:\d*\.\d+|\d+))\s*,\s*([-+]?(?:\d*\.\d+|\d+))\s*,\s*([-+]?(?:\d*\.\d+|\d+))", line.strip()) 

84 if size_match: 

85 size = float(size_match.group(1)), float(size_match.group(2)), float(size_match.group(3)) 

86 int_match = re.match(r"spacing=\s+([-+]?(?:\d*\.\d+|\d+))\s*,\s*([-+]?(?:\d*\.\d+|\d+))\s*,\s*([-+]?(?:\d*\.\d+|\d+))", line.strip()) 

87 if int_match: 

88 grid_params['INT'] = float(int_match.group(1)), float(int_match.group(2)), float(int_match.group(3)) 

89 cen_match = re.match(r"center=\s+([-+]?(?:\d*\.\d+|\d+))\s*,\s*([-+]?(?:\d*\.\d+|\d+))\s*,\s*([-+]?(?:\d*\.\d+|\d+))", line.strip()) 

90 if cen_match: 

91 grid_params['CEN'] = float(cen_match.group(1)), float(cen_match.group(2)), float(cen_match.group(3)) 

92 dim_match = re.match(r"dim=\s+([-+]?(?:\d*\.\d+|\d+))\s*,\s*([-+]?(?:\d*\.\d+|\d+))\s*,\s*([-+]?(?:\d*\.\d+|\d+))", line.strip()) 

93 if dim_match: 

94 grid_params['DIM'] = int(dim_match.group(1)), int(dim_match.group(2)), int(dim_match.group(3)) 

95 if origin and size and grid_params['INT'] and grid_params['CEN'] and grid_params['DIM']: 

96 break 

97 return origin, size, grid_params # type: ignore 

98 

99 

100def create_unique_file_path(parent_dir: Optional[Union[str, Path]] = None, extension: Optional[str] = None) -> str: 

101 if not parent_dir: 

102 parent_dir = Path.cwd() 

103 if not extension: 

104 extension = '' 

105 while True: 

106 name = str(uuid.uuid4())+extension 

107 file_path = Path.joinpath(Path(parent_dir).resolve(), name) 

108 if not file_path.exists(): 

109 return str(file_path) 

110 

111 

112def write_cmip_pdb(input_pdb_path, output_pdb_path, charges_list, elements_list): 

113 with open(input_pdb_path) as inPDB, open(output_pdb_path, 'w') as outPDB: 

114 index = 0 

115 for line in inPDB: 

116 line = line.rstrip() 

117 if not re.match('^ATOM', line) and not re.match('^HETATM', line): 

118 continue 

119 outPDB.write("{}{:8.4f} {}\n".format(line[:54], charges_list[index], elements_list[index])) 

120 index += 1 

121 

122 

123def get_topology_cmip_elements_canonical(input_topology_filename: str) -> list: 

124 """ 

125 This function also accepts pdb files 

126 Args: 

127 input_topology_filename: 

128 

129 Returns: 

130 

131 """ 

132 # Remove forcefield itp references from top file. 

133 if input_topology_filename.lower().endswith('.top'): 

134 with open(input_topology_filename) as tf: 

135 top_lines = tf.readlines() 

136 top_file = create_unique_file_path(parent_dir=Path(input_topology_filename).parent.resolve(), extension='.top') 

137 with open(top_file, 'w') as nt: 

138 for line in top_lines: 

139 if re.search(r"\.ff.*\.itp", line): 

140 continue 

141 nt.write(line) 

142 u = mda.Universe(top_file, topology_format="ITP") 

143 os.unlink(top_file) 

144 else: 

145 u = mda.Universe(input_topology_filename) 

146 # mda_charges = [round(val, 4) for val in u.atoms.charges] 

147 # mda_atom_types = list(guess_types(u.atoms.names)) 

148 mda_atom_types = [] 

149 for atom in u.atoms: # type: ignore 

150 atom_element = guess_atom_element(atom.name) 

151 if atom_element == 'H': 

152 bonded_atom_element = guess_atom_element(atom.bonded_atoms[0].name) 

153 if bonded_atom_element == 'O': 

154 atom_element = 'HO' 

155 elif bonded_atom_element in ['N', 'S']: 

156 atom_element = 'HN' 

157 mda_atom_types.append(atom_element) 

158 return mda_atom_types 

159 

160 

161def get_topology_charges(input_topology_filename: str) -> list: 

162 """ Given a topology which includes charges 

163 Extract those charges and save them in a list to be returned 

164 Supported formats (tested): prmtop, top, psf 

165 """ 

166 # Remove forcefield itp references from top file. 

167 if input_topology_filename.lower().endswith('.top'): 

168 with open(input_topology_filename) as tf: 

169 top_lines = tf.readlines() 

170 top_file = create_unique_file_path(parent_dir=Path(input_topology_filename).parent.resolve(), extension='.top') 

171 

172 with open(top_file, 'w') as nt: 

173 for line in top_lines: 

174 if re.search(r"\.ff.*\.itp", line): 

175 continue 

176 nt.write(line) 

177 u = mda.Universe(top_file, topology_format="ITP") 

178 os.unlink(top_file) 

179 else: 

180 u = mda.Universe(input_topology_filename) 

181 return [round(val, 4) for val in u.atoms.charges] # type: ignore 

182 

183 

184class Residue: 

185 def __init__(self, data): 

186 self.id = data[0]+':'+data[1] 

187 self.atType = data[2] 

188 self.charg = float(data[3]) 

189 

190 

191class ResiduesDataLib: 

192 def __init__(self, fname): 

193 self.RData = {} 

194 with open(fname) as fh: 

195 for line in fh: 

196 if line[0] == '#': 

197 continue 

198 data = line.split() 

199 r = Residue(data) 

200 self.RData[r.id] = r 

201 self.nres = len(self.RData) 

202 

203 def getParams(self, resid, atid): 

204 if resid+':'+atid in self.RData: 

205 return self.RData[resid+':'+atid] 

206 else: 

207 print("WARNING: atom not found in library (", resid+':'+atid, ')') 

208 return {} 

209 

210 

211def get_pdb_charges(input_pdb_filename: str, residue_library_path: Optional[str] = None) -> list: 

212 if not residue_library_path: 

213 residue_library_path = str(Path(__file__).parent.joinpath("dat", "aa.lib").resolve()) 

214 

215 aaLib = ResiduesDataLib(residue_library_path) 

216 print("{} residue/atom pairs loaded from {}".format(aaLib.nres, residue_library_path)) 

217 

218 with open(input_pdb_filename) as inPDB: 

219 charges_list = [] 

220 residue_num = None 

221 for line in inPDB: 

222 line = line.rstrip() 

223 if not re.match('^ATOM', line) and not re.match('^HETATM', line): 

224 continue 

225 

226 nomat = line[12:16] 

227 if re.match('^[1-9]', nomat): 

228 nomat = nomat[1:4] + nomat[:1] 

229 nomat = nomat.replace(' ', '') 

230 nomr = line[17:21].replace(' ', '') 

231 # WARNING: Temporal totally uninformed assumption by PA 

232 if nomr == "HIS": 

233 nomr = "HID" 

234 if residue_num != line[23:27]: 

235 print(f"WARNING replacing HIS:{line[23:27]} by HID") 

236 residue_num = line[23:27] 

237 # Thats not correct REVIEW this should be done for all the atoms in the residue 

238 # not just the oxigen 

239 if nomat == "OXT": 

240 nomr = nomr + "C" 

241 print(f"WARNING replacing {nomr[:-1]}:{line[23:27]} by {nomr}") 

242 ###################################################### 

243 parms = aaLib.getParams(nomr, nomat) 

244 charges_list.append(parms.charg) # type: ignore 

245 return charges_list 

246 

247 

248def get_pdb_cmip_elements_canonical(input_pdb_filename: str, residue_library_path: Optional[str] = None) -> list: 

249 if not residue_library_path: 

250 residue_library_path = str(Path(__file__).parent.joinpath("dat", "aa.lib").resolve()) 

251 

252 aaLib = ResiduesDataLib(residue_library_path) 

253 print("{} residue/atom pairs loaded from {}".format(aaLib.nres, residue_library_path)) 

254 

255 with open(input_pdb_filename) as inPDB: 

256 elements_list = [] 

257 residue_num = None 

258 for line in inPDB: 

259 line = line.rstrip() 

260 if not re.match('^ATOM', line) and not re.match('^HETATM', line): 

261 continue 

262 

263 nomat = line[12:16] 

264 if re.match('^[1-9]', nomat): 

265 nomat = nomat[1:4] + nomat[:1] 

266 nomat = nomat.replace(' ', '') 

267 nomr = line[17:21].replace(' ', '') 

268 # WARNING: Temporal totally uninformed assumption by PA 

269 if nomr == "HIS": 

270 nomr = "HID" 

271 if residue_num != line[23:27]: 

272 print(f"WARNING replacing HIS:{line[23:27]} by HID") 

273 residue_num = line[23:27] 

274 # Thats not correct REVIEW this should be done for all the atoms in the residue 

275 # not just the oxigen 

276 if nomat == "OXT": 

277 nomr = nomr + "C" 

278 print(f"WARNING replacing {nomr[:-1]}:{line[23:27]} by {nomr}") 

279 ###################################################### 

280 parms = aaLib.getParams(nomr, nomat) 

281 elements_list.append(parms.atType) # type: ignore 

282 return elements_list 

283 

284 

285def get_pdb_total_charge(pdb_file_path: str) -> float: 

286 # Biopython 1.9 does not capture charge of atoms in CMIP format 

287 # Should do it by hand 

288 total_charge: float = 0.0 

289 with open(pdb_file_path) as pdb_file: 

290 for line in pdb_file: 

291 if line[0:6].strip().upper() in ["ATOM", "HETATM"] and len(line) > 63: 

292 total_charge += float(line[55:63].strip()) 

293 return total_charge 

294 

295 

296def probe_params_grid(probe_id: int = 0, readgrid: int = 2, pbfocus: int = 1, perfill: float = 0.6, 

297 grid_int: tuple[float, float, float] = (0.5, 0.5, 0.5)) -> dict[str, str]: 

298 grid_dict = {} 

299 grid_dict[f"readgrid{probe_id}"] = f"{readgrid}" 

300 grid_dict[f"perfill{probe_id}"] = f"{perfill}" 

301 grid_dict['pbfocus'] = f"{pbfocus}" 

302 grid_dict['grid_int'] = f"INTX{probe_id}={grid_int[0]},INTY{probe_id}={grid_int[1]},INTZ{probe_id}={grid_int[2]}" 

303 

304 return grid_dict 

305 

306 

307def params_grid(grid_type: str, readgrid: int = 0, perfill: float = 0.8, 

308 grid_int: tuple[float, float, float] = (0.5, 0.5, 0.5), 

309 grid_dim: tuple[float, float, float] = (64, 64, 64), 

310 grid_cen: tuple[float, float, float] = (0.0, 0.0, 0.0)) -> dict[str, str]: 

311 # grid_type older readgrid equivalences: 

312 # 2 proteina dist minima pecentatge, 4 distancia minima prot, 5 distancia al centre de masses 

313 # 1 

314 # interaction = 0 , 3 explicita grid d'entrada 

315 # cmip, titration, pbsolvation = 2, >3 

316 

317 grid_dict = {} 

318 grid_dict["readgrid"] = f"{readgrid}" 

319 

320 if grid_type in ['pb_interaction_energy', 'mip_pos', 'mip_neu', 'mip_neg', 'docking']: 

321 grid_dict['grid_cen'] = f"CENX={grid_cen[0]},CENY={grid_cen[1]},CENZ={grid_cen[2]}" 

322 grid_dict['grid_dim'] = f"DIMX={grid_dim[0]},DIMY={grid_dim[1]},DIMZ={grid_dim[2]}" 

323 grid_dict['grid_int'] = f"INTX={grid_int[0]},INTY={grid_int[1]},INTZ={grid_int[2]}" 

324 elif grid_type in ['solvation', 'titration']: 

325 grid_dict['perfill'] = f"{perfill}" 

326 grid_dict['grid_int'] = f"INTX={grid_int[0]},INTY={grid_int[1]},INTZ={grid_int[2]}" 

327 

328 return grid_dict 

329 

330 

331def params_preset(execution_type: str) -> dict[str, str]: 

332 params_dict = {} 

333 grid_dict = {} 

334 probe_grid_dict = {} 

335 execution_type = execution_type.strip() 

336 if execution_type == 'titration': 

337 grid_dict = params_grid(grid_type=execution_type, readgrid=2, perfill=0.8, grid_int=(0.5, 0.5, 0.5)) 

338 params_dict = { 

339 'title': 'Titration', 

340 'tipcalc': 1, 

341 'calcgrid': 1, 

342 'irest': 0, 

343 'orest': 0, 

344 'coorfmt': 2, 

345 'dields': 2, 

346 'titration': 1, 'inifoc': 2, 'cutfoc': -0.5, 'focus': 1, 'ninter': 10, 'clhost': 1, 'titcut': 20., 

347 'titwat': 10, 'titip': 10, 'titim': 10 

348 } 

349 elif execution_type == 'mip_pos': 

350 grid_dict = params_grid(grid_type=execution_type, readgrid=2) 

351 params_dict = { 

352 'title': 'MIP positive probe', 

353 'tipcalc': 0, 

354 'calcgrid': 1, 

355 'irest': 0, 

356 'orest': 0, 

357 'coorfmt': 2, 

358 'dields': 2, 

359 'cubeoutput': 1, 

360 'fvdw': 0.8, 

361 'carmip': 1, 

362 'tipatmip': "'OW'" 

363 } 

364 elif execution_type == 'mip_neu': 

365 grid_dict = params_grid(grid_type=execution_type, readgrid=2) 

366 params_dict = { 

367 'title': 'MIP neutral probe', 

368 'tipcalc': 0, 

369 'calcgrid': 1, 

370 'irest': 0, 

371 'orest': 0, 

372 'coorfmt': 2, 

373 'dields': 2, 

374 'cubeoutput': 1, 

375 'fvdw': 0.8, 

376 'carmip': 0, 

377 'tipatmip': "'OW'" 

378 } 

379 elif execution_type == 'mip_neg': 

380 grid_dict = params_grid(grid_type=execution_type, readgrid=2) 

381 params_dict = { 

382 'title': 'MIP negative probe', 

383 'tipcalc': 0, 

384 'calcgrid': 1, 

385 'irest': 0, 

386 'orest': 0, 

387 'coorfmt': 2, 

388 'dields': 2, 

389 'cubeoutput': 1, 

390 'fvdw': 0.8, 

391 'carmip': -1, 

392 'tipatmip': "'OW'" 

393 } 

394 # TODO 'carmip': 1, 

395 # wat: tipcalc: 1 + titration: 'inifoc': 2, 'cutfoc': -0.5, 'focus': 1, 'ninter': 10, 

396 elif execution_type == 'solvation': 

397 grid_dict = params_grid(grid_type=execution_type, readgrid=2, perfill=0.2, 

398 grid_int=(0.5, 0.5, 0.5)) 

399 params_dict = { 

400 'title': 'Solvation & MEP', 

401 'tipcalc': 0, 

402 'calcgrid': 1, 

403 'irest': 0, 

404 'orest': 0, 

405 'coorfmt': 2, 

406 'cubeoutput': 1, 'vdw': 0, 'pbelec': 1, 

407 'novdwgrid': 1, 'solvenergy': 1, 'dielc': 1, 'dielsol': 80 

408 } 

409 

410 elif execution_type == 'pb_interaction_energy': 

411 grid_dict = params_grid(grid_type=execution_type, readgrid=2) 

412 probe_grid_dict = probe_params_grid(probe_id=0, readgrid=2, pbfocus=1, perfill=0.6, 

413 grid_int=(1.5, 1.5, 1.5)) 

414 

415 # TODO Check for external box file or parameters 

416 params_dict = { 

417 'title': 'Docking Interaction energy calculation. PB electrostatics', 

418 'tipcalc': 3, 

419 'calcgrid': 1, 

420 'irest': 0, 

421 'orest': 0, 

422 'coorfmt': 2, 

423 'fvdw': 0.8, 'pbelec': 1, 'pbinic': 2, 'wgp': 0, 'ebyatom': 1 

424 } 

425 

426 elif execution_type == 'docking': 

427 grid_dict = params_grid(grid_type=execution_type, readgrid=2) 

428 

429 params_dict = { 

430 'title': 'Docking Mehler Solmajer dielectric', 

431 'tipcalc': 2, 

432 'calcgrid': 1, 

433 'irest': 0, 

434 'orest': 1, 

435 'coorfmt': 2, 

436 'fvdw': 0.8, 'dields': 2, 'focus': 1, 'cutfoc': 100, 

437 'tiprot': 5, 'inifoc': 5, 'ninter': 20, 

438 'clhost': 1, 'minout': 50, 'splitpdb': 0 

439 } 

440 

441 elif execution_type == 'docking_rst': 

442 params_dict = { 

443 'title': 'Docking from restart file', 

444 'readgrid': 0, 

445 'tipcalc': 2, 

446 'calcgrid': 1, 

447 'irest': 2, 

448 'orest': 1, 

449 'coorfmt': 2, 

450 'fvdw': 0.8, 'dields': 2, 'focus': 1, 'cutfoc': 100, 

451 'tiprot': 5, 'inifoc': 5, 'ninter': 20, 

452 'clhost': 1, 'minout': 50, 'splitpdb': 0, 'cutelec': 10.0 

453 } 

454 elif execution_type == 'check_only': 

455 params_dict = { 

456 'title': 'Check_only dry run of CMIP', 

457 'CHECKONLY': 1, 

458 'readgrid': 2, 

459 'calcgrid': 1, 

460 'tipcalc': 0, 

461 'irest': 0, 

462 'ebyatom': 1, 

463 'coorfmt': 2, 

464 'fvdw': 0.8 

465 } 

466 

467 return {**params_dict, **grid_dict, **probe_grid_dict} # type: ignore 

468 

469 

470def read_params_file(input_params_path: str) -> dict[str, str]: 

471 params_dict = {} 

472 with open(input_params_path) as input_params_file: 

473 params_dict['title'] = input_params_file.readline() 

474 for line in input_params_file: 

475 line = line.replace(' ', '') 

476 if line.startswith('&'): 

477 continue 

478 param_list = line.split(',') 

479 for param in param_list: 

480 param_key, param_value = param.split("=") 

481 

482 # Grid Values 

483 if len(param_key) > 3 and param_key[:3].startswith('INT'): 

484 if params_dict.get('grid_int'): 

485 params_dict['grid_int'] += f",{param_key}={param_value}" 

486 else: 

487 params_dict['grid_int'] = f"{param_key}={param_value}" 

488 elif len(param_key) > 3 and param_key[:3].startswith('CEN'): 

489 if params_dict.get('grid_cen'): 

490 params_dict['grid_cen'] += f",{param_key}={param_value}" 

491 else: 

492 params_dict['grid_cen'] = f"{param_key}={param_value}" 

493 elif len(param_key) > 3 and param_key[:3].startswith('DIM'): 

494 if params_dict.get('grid_dim'): 

495 params_dict['grid_dim'] += f",{param_key}={param_value}" 

496 else: 

497 params_dict['grid_dim'] = f"{param_key}={param_value}" 

498 # Rest of parameters 

499 else: 

500 params_dict[param_key] = param_value 

501 return params_dict 

502 

503 

504def write_params_file(output_params_path: str, params_dict: dict[str, str]) -> str: 

505 with open(output_params_path, 'w') as output_params_file: 

506 output_params_file.write(f"{params_dict.pop('title', 'Untitled')}\n") 

507 output_params_file.write("&cntrl\n") 

508 for params_key, params_value in params_dict.items(): 

509 if params_key in ['grid_int', 'grid_cen', 'grid_dim', 'grid_int0', 'grid_cen0', 'grid_dim0']: 

510 output_params_file.write(f" {params_value}\n") 

511 else: 

512 output_params_file.write(f" {params_key} = {params_value}\n") 

513 output_params_file.write("&end\n") 

514 return output_params_path 

515 

516 

517def create_params_file(output_params_path: str, input_params_path: Optional[str] = None, 

518 params_preset_dict: Optional[dict] = None, params_properties_dict: Optional[dict] = None) -> str: 

519 """ Gets a params dictionary and a presset and returns the path of the created params file for cmip. 

520 

521 Args: 

522 

523 

524 Returns: 

525 str: params file path. 

526 """ 

527 params_dict = {} 

528 

529 if params_preset_dict: 

530 for k, v in params_preset_dict.items(): 

531 params_dict[k] = v 

532 if input_params_path: 

533 input_params_dict = read_params_file(input_params_path) 

534 for k, v in input_params_dict.items(): 

535 params_dict[k] = v 

536 if params_properties_dict: 

537 for k, v in params_properties_dict.items(): 

538 params_dict[k] = v 

539 

540 return write_params_file(output_params_path, params_dict) 

541 

542 

543def mark_residues(residue_list: list[str], input_cmip_pdb_path: str, output_cmip_pdb_path: str, out_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None) -> None: 

544 """Marks using an "X" before the atom type all the residues in *residue_list* and writes the result in *output_cmip_pdb_path*. 

545 

546 Args: 

547 residue_list (list): Residue list in the format "Chain:Resnum" (no spaces between the elements) separated by commas. If empty or none all residues will be marked. 

548 local_log (:obj:`logging.Logger`): local log object. 

549 global_log (:obj:`logging.Logger`): global log object. 

550 """ 

551 if not residue_list: 

552 fu.log("Empty residue_list all residues will be marked", out_log, global_log) 

553 else: 

554 fu.log(f"Residue list: {residue_list}", out_log, global_log) 

555 

556 with open(input_cmip_pdb_path) as pdb_file_in, open(output_cmip_pdb_path, 'w') as pdb_file_out: 

557 residue_set_used = set() 

558 

559 res_counter = 0 

560 for line in pdb_file_in: 

561 if _is_atom(line): 

562 residue_code = _get_residue_code(line) 

563 used_residue = _get_residue_code_in_list(residue_code, residue_list) 

564 if not residue_list or used_residue: 

565 res_counter += 1 

566 residue_set_used.add(used_residue) 

567 line = _mark_pdb_atom(line) 

568 pdb_file_out.write(line) 

569 fu.log(f"{res_counter} residues have been marked", out_log, global_log) 

570 

571 if residue_list: 

572 unused_residues = set(residue_list) - residue_set_used 

573 if unused_residues: 

574 fu.log(f"The following residues where present in the residue_list and have not been marked: {unused_residues}", out_log, global_log) 

575 

576 

577def _mark_pdb_atom(line: str) -> str: 

578 newline = list(line) 

579 newline.insert(64, 'X') 

580 return ''.join(newline) 

581 

582 

583def _get_residue_code_in_list(input_residue, residue_list): 

584 if not residue_list: 

585 return None 

586 for res_code in residue_list: 

587 if input_residue == res_code: 

588 return res_code 

589 chain_rescode, resnum_rescode = res_code.split(":") 

590 chain_input, resnum_input = input_residue.split(":") 

591 if not chain_rescode: 

592 if resnum_rescode == resnum_input: 

593 return res_code 

594 if not resnum_rescode: 

595 if chain_rescode == chain_input: 

596 return res_code 

597 return None 

598 

599 

600def _get_residue_code(line: str) -> str: 

601 return _get_chain(line)+":"+_get_resnum(line) 

602 

603 

604def _get_chain(line: str) -> str: 

605 return line[21].upper() 

606 

607 

608def _get_resnum(line: str) -> str: 

609 return line[22:27].strip() 

610 

611 

612def _is_atom(line: str) -> bool: 

613 return line[0:6].strip().upper() in ["ATOM", "HETATM"]