Coverage for biobb_cp2k / cp2k / cp2k_prep.py: 73%

273 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-05 12:58 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the Cp2kPrep class and the command line interface.""" 

4from typing import Optional 

5from typing import Any 

6import os 

7import collections.abc 

8from pathlib import Path 

9from biobb_common.generic.biobb_object import BiobbObject 

10from biobb_common.tools import file_utils as fu 

11from biobb_common.tools.file_utils import launchlogger 

12from biobb_cp2k.cp2k.common import check_input_path, check_output_path 

13 

14 

15class Cp2kPrep(BiobbObject): 

16 """ 

17 | biobb_cp2k Cp2kPrep 

18 | Helper bb to prepare inputs for the `CP2K QM tool <https://www.cp2k.org/>`_ module. 

19 | Prepares input files for the CP2K QM tool. 

20 

21 Args: 

22 input_inp_path (str) (Optional): Input configuration file (CP2K run options). File type: input. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/data/cp2k/cp2k_energy.inp>`_. Accepted formats: inp (edam:format_2330), in (edam:format_2330), txt (edam:format_2330), wfn (edam:format_2333). 

23 input_pdb_path (str) (Optional): Input PDB file. File type: input. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/data/cp2k/H2O_box.pdb>`_. Accepted formats: pdb (edam:format_1476). 

24 input_rst_path (str) (Optional): Input restart file (WFN). File type: input. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/data/cp2k/cp2k.wfn>`_. Accepted formats: wfn (edam:format_2333). 

25 output_inp_path (str): Output CP2K input configuration file. File type: output. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/reference/cp2k/cp2k_prep_out.inp>`_. Accepted formats: inp (edam:format_2330), in (edam:format_2330), txt (edam:format_2330). 

26 properties (dict - Python dictionary object containing the tool parameters, not input/output files): 

27 * **simulation_type** (*str*) - ("energy") Default options for the cp2k_in file. Each creates a different inp file. Values: `energy <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_energy.inp>`_ (Computes Energy and Forces), `geom_opt <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_geom_opt.inp>`_ (Runs a geometry optimization), `md <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_md.inp>`_ (Runs an MD calculation), `mp2 <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_mp2.inp>`_ (Runs an MP2 calculation). 

28 * **cp2k_in** (*dict*) - ({}) CP2K run options specification. 

29 * **cell_cutoff** (*float*) - (5.0) CP2K cell cutoff, to build the cell around the system (only used if input_pdb_path is defined). 

30 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

31 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

32 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

33 

34 Examples: 

35 This is a use example of how to use the building block from Python:: 

36 

37 from biobb_cp2k.cp2k.cp2k_prep import cp2k_prep 

38 prop = { 

39 'simulation_type': 'geom_opt' 

40 } 

41 cp2k_prep(input_pdb_path='/path/to/input.pdb', 

42 input_inp_path='/path/to/cp2k_in.inp', 

43 output_inp_path='/path/to/cp2k_out.inp', 

44 properties=prop) 

45 

46 Info: 

47 * wrapped_software: 

48 * name: In house 

49 * license: Apache-2.0 

50 * ontology: 

51 * name: EDAM 

52 * schema: http://edamontology.org/EDAM.owl 

53 

54 """ 

55 

56 def __init__(self, output_inp_path: str, 

57 input_pdb_path: Optional[str] = None, input_inp_path: Optional[str] = None, input_rst_path: Optional[str] = None, 

58 properties: Optional[dict] = None, **kwargs) -> None: 

59 

60 properties = properties or {} 

61 

62 # Call parent class constructor 

63 super().__init__(properties) 

64 self.locals_var_dict = locals().copy() 

65 

66 # Input/Output files 

67 self.io_dict = { 

68 'in': {'input_pdb_path': input_pdb_path, 

69 'input_inp_path': input_inp_path, 

70 'input_rst_path': input_rst_path}, 

71 'out': {'output_inp_path': output_inp_path} 

72 } 

73 

74 # Properties specific for BB 

75 self.properties = properties 

76 self.simulation_type = properties.get('simulation_type', "energy") 

77 self.cell_cutoff = properties.get('cell_cutoff', 5.0) 

78 self.cp2k_in = properties.get('cp2k_in', dict()) 

79 # self.cp2k_in = {k: str(v) for k, v in properties.get('cp2k_in', dict()).items()} 

80 

81 # Check the properties 

82 self.check_properties(properties) 

83 self.check_arguments() 

84 

85 def iterdict(self, d, depth, fileout_h): 

86 for k, v in d.items(): 

87 if k.upper() == "FORCE_EVAL" or k.upper() == "MOTION": 

88 depth = 0 

89 elif "-" in k: 

90 k = k.split("-")[0] 

91 if isinstance(v, dict): 

92 depth = depth+1 

93 if 'name' in v.keys(): 

94 print(' ' * depth + "&" + k.upper(), v['name'], file=fileout_h) 

95 else: 

96 print(' ' * depth + "&" + k.upper(), file=fileout_h) 

97 self.iterdict(v, depth, fileout_h) 

98 print(' ' * depth + "&END", k.upper(), file=fileout_h) 

99 depth = depth-1 

100 else: 

101 if k.isnumeric(): 

102 print(' ' * depth, v, file=fileout_h) 

103 elif isinstance(v, list): 

104 if not isinstance(v[0], dict): 

105 print(' ' * depth, k, ' '.join(v), file=fileout_h) 

106 elif isinstance(v[0], dict): 

107 depth = depth+1 

108 if k.upper() == 'KIND': 

109 for atom in v: 

110 print(' ' * depth + "&" + k.upper(), atom['name'], file=fileout_h) 

111 self.iterdict(atom, depth, fileout_h) 

112 print(' ' * depth + "&END", k.upper(), file=fileout_h) 

113 

114 elif k.upper() == 'COORD': 

115 print(' ' * depth + "&" + k.upper(), file=fileout_h) 

116 for atom in v: 

117 self.iterdict(atom, depth, fileout_h) 

118 else: 

119 print(' ' * depth + "&" + k.upper(), file=fileout_h) 

120 

121 if k.upper() != 'KIND': 

122 print(' ' * depth + "&END", k.upper(), file=fileout_h) 

123 depth = depth-1 

124 

125 elif k != 'name': 

126 print(' ' * depth, k.upper(), v, file=fileout_h) 

127 

128 # global dict3 = {} 

129 def parse_rec_def(self, cp2k_in_array: list[Any], index: int, stop: str) -> dict[Any, Any]: 

130 dict_var: dict[Any, Any] = {} 

131 dict_var2: dict[Any, Any] = {} 

132 depth = 0 

133 rec = False 

134 for line in cp2k_in_array[index:]: 

135 index = index+1 

136 if line.startswith('#') or not line.strip(): 

137 continue 

138 

139 if 'END' in line: 

140 depth = depth - 1 

141 vals = line.lstrip().split() 

142 

143 if depth < 0: 

144 return dict_var 

145 elif '&' in line: 

146 depth = depth + 1 

147 if depth == 1: 

148 vals = line.lstrip().split() 

149 key = vals[0].replace('&', '') 

150 if (key == 'KIND'): 

151 key_name = key + "-" + vals[1] 

152 if dict_var.get(key): 

153 dict_var[key].append(self.parse_rec_def(cp2k_in_array, index, key_name)) 

154 else: 

155 dict_var[key] = [] 

156 dict_var[key].append(self.parse_rec_def(cp2k_in_array, index, key_name)) 

157 else: 

158 rec = True 

159 dict_var[key] = self.parse_rec_def(cp2k_in_array, index, key) 

160 if len(vals) > 1 and key != 'KIND': 

161 # print(stop + " Add dict_var[key]['name'] = " + str(vals[1].strip())) 

162 dict_var[key]['name'] = vals[1].strip() 

163 

164 elif not rec: 

165 vals = line.lstrip().split() 

166 # print(stop + " Add dict_var[" + str(vals[0]) + "] = " + str(vals[1].strip())) 

167 if (stop == 'COORD'): 

168 if dict_var2.get('coords_list'): 

169 dict_var2['coords_list'].append({vals[0]: vals[1:]}) 

170 else: 

171 dict_var2['coords_list'] = [] 

172 dict_var2['coords_list'].append({vals[0]: vals[1:]}) 

173 

174 dict_var = dict_var2['coords_list'] # type: ignore 

175 elif (len(vals) == 2): 

176 if (stop.startswith('KIND-')): 

177 key2, name = stop.split('-') 

178 dict_var['name'] = name 

179 dict_var[vals[0]] = vals[1].strip() 

180 else: 

181 dict_var[vals[0]] = vals[1:] 

182 

183 return dict_var 

184 

185 def parse_pdb(self, pdb_file): 

186 dict_var = {} 

187 # coord = {} 

188 coord = [] 

189 cell = {} 

190 max_x = -999.999 

191 max_y = -999.999 

192 max_z = -999.999 

193 min_x = 999.999 

194 min_y = 999.999 

195 min_z = 999.999 

196 for line in open(pdb_file): 

197 # ATOM 2 C7 JZ4 1 21.520 -27.270 -4.230 1.00 0.00 

198 if line[0:4] == 'ATOM' or line[0:6] == 'HETATM': 

199 # atom = line[12:16] 

200 elem = line[77] 

201 x = line[30:38] 

202 y = line[38:46] 

203 z = line[46:54] 

204 if (float(x) > float(max_x)): 

205 max_x = x 

206 if (float(y) > float(max_y)): 

207 max_y = y 

208 if (float(z) > float(max_z)): 

209 max_z = z 

210 if (float(x) < float(min_x)): 

211 min_x = x 

212 if (float(y) < float(min_y)): 

213 min_y = y 

214 if (float(z) < float(min_z)): 

215 min_z = z 

216 # coord[elem] = [x,y,z] 

217 # lcoord = [] 

218 coord.append({elem: [x, y, z]}) 

219 # coord[elem] = lcoord 

220 

221 box_x = float(max_x) - float(min_x) 

222 box_y = float(max_y) - float(min_y) 

223 box_z = float(max_z) - float(min_z) 

224 

225 box_x = float(f'{box_x:.3f}') 

226 box_y = float(f'{box_y:.3f}') 

227 box_z = float(f'{box_z:.3f}') 

228 

229 box_x = box_x + self.cell_cutoff 

230 box_y = box_y + self.cell_cutoff 

231 box_z = box_z + self.cell_cutoff 

232 

233 # cell['A'] = [str(box_x),'0.000','0.000'] 

234 # cell['B'] = ['0.000',str(box_y),'0.000'] 

235 # cell['C'] = ['0.000','0.000',str(box_z)] 

236 

237 cell['ABC'] = [str(box_x), str(box_y), str(box_z)] 

238 

239 dict_var['coord'] = coord 

240 # dict_var['coords'] = coords 

241 dict_var['cell'] = cell 

242 

243 return dict_var 

244 

245 def merge(self, a, b): 

246 for key_b in b: 

247 key_bu = key_b.upper() 

248 if key_bu in (key_a.upper() for key_a in a): 

249 for key_a in a: 

250 key_au = key_a.upper() 

251 if "-" in key_au: 

252 key_au = key_au.split("-")[0] 

253 if key_au == key_bu: 

254 if isinstance(a[key_a], dict) and isinstance(b[key_b], dict): 

255 self.merge(a[key_a], b[key_b]) 

256 elif isinstance(a[key_a], list) and isinstance(b[key_b], list): 

257 if (key_au == 'KIND'): 

258 for idxB, elemB in enumerate(b[key_b]): 

259 done = False 

260 for idxA, elemA in enumerate(a[key_a]): 

261 if elemB['name'] == elemA['name']: 

262 done = True 

263 self.merge(a[key_a][idxA], b[key_b][idxB]) 

264 if not done: 

265 a[key_a].append(b[key_b][idxB]) 

266 elif a[key_a] == b[key_b]: 

267 pass # same leaf value 

268 else: 

269 a[key_a] = b[key_b] 

270 else: 

271 a[key_b] = b[key_b] 

272 return a 

273 

274 def replace_coords(self, a, b): 

275 # dict_var['force_eval'] = {'subsys' : {'coord' : coord } } 

276 print("BioBB_CP2K, replacing coordinates...") 

277 for key in a: 

278 if key.upper() == 'FORCE_EVAL': 

279 for key_2 in a[key]: 

280 if key_2.upper() == 'SUBSYS': 

281 if 'coord' in a[key][key_2]: 

282 a[key][key_2]['coord'] = b['coord'] 

283 elif 'Coord' in a[key][key_2]: 

284 a[key][key_2]['Coord'] = b['coord'] 

285 elif 'COORD' in a[key][key_2]: 

286 a[key][key_2]['COORD'] = b['coord'] 

287 else: 

288 a[key][key_2]['coord'] = b['coord'] 

289 

290 if 'cell' in a[key][key_2]: 

291 if 'ABC' in a[key][key_2]['cell']: 

292 a[key][key_2]['cell']['ABC'] = b['cell']['ABC'] 

293 elif 'abc' in a[key][key_2]['cell']: 

294 a[key][key_2]['cell']['abc'] = b['cell']['ABC'] 

295 elif 'Abc' in a[key][key_2]['cell']: 

296 a[key][key_2]['cell']['Abc'] = b['cell']['ABC'] 

297 else: 

298 a[key][key_2]['cell']['abc'] = b['cell']['ABC'] 

299 elif 'Cell' in a[key][key_2]: 

300 if 'ABC' in a[key][key_2]['Cell']: 

301 a[key][key_2]['Cell']['ABC'] = b['cell']['ABC'] 

302 elif 'abc' in a[key][key_2]['Cell']: 

303 a[key][key_2]['Cell']['abc'] = b['cell']['ABC'] 

304 elif 'Abc' in a[key][key_2]['Cell']: 

305 a[key][key_2]['Cell']['Abc'] = b['cell']['ABC'] 

306 else: 

307 a[key][key_2]['Cell']['abc'] = b['cell']['ABC'] 

308 elif 'CELL' in a[key][key_2]: 

309 if 'ABC' in a[key][key_2]['CELL']: 

310 a[key][key_2]['CELL']['ABC'] = b['cell']['ABC'] 

311 elif 'abc' in a[key][key_2]['CELL']: 

312 a[key][key_2]['CELL']['abc'] = b['cell']['ABC'] 

313 elif 'Abc' in a[key][key_2]['CELL']: 

314 a[key][key_2]['CELL']['Abc'] = b['cell']['ABC'] 

315 else: 

316 a[key][key_2]['CELL']['abc'] = b['cell']['ABC'] 

317 else: 

318 a[key][key_2]['cell'] = b['cell'] 

319 return a 

320 

321 def check_data_params(self, out_log, out_err): 

322 """ Checks input/output paths correctness """ 

323 

324 # Check input(s) 

325 self.io_dict["in"]["input_inp_path"] = check_input_path(self.io_dict["in"]["input_inp_path"], "input_inp_path", True, out_log, self.__class__.__name__) 

326 self.io_dict["in"]["input_pdb_path"] = check_input_path(self.io_dict["in"]["input_pdb_path"], "input_pdb_path", True, out_log, self.__class__.__name__) 

327 self.io_dict["in"]["input_rst_path"] = check_input_path(self.io_dict["in"]["input_rst_path"], "input_rst_path", True, out_log, self.__class__.__name__) 

328 

329 # Check output(s) 

330 self.io_dict["out"]["output_inp_path"] = check_output_path(self.io_dict["out"]["output_inp_path"], "output_inp_path", False, out_log, self.__class__.__name__) 

331 

332 def update(self, d, u): 

333 for k, v in u.items(): 

334 if isinstance(v, collections.abc.Mapping): 

335 d[k] = self.update(d.get(k, {}), v) 

336 else: 

337 d[k] = v 

338 return d 

339 

340 @launchlogger 

341 def launch(self): 

342 """Launches the execution of the Cp2kPrep module.""" 

343 

344 # check input/output paths and parameters 

345 self.check_data_params(self.out_log, self.err_log) 

346 

347 # Setup Biobb 

348 if self.check_restart(): 

349 return 0 

350 self.stage_files() 

351 

352 # Generating inp file 

353 

354 # Parsing the input PDB file (if any) 

355 if self.io_dict["in"]["input_pdb_path"]: 

356 coord = self.parse_pdb(self.io_dict["in"]["input_pdb_path"]) 

357 # print(coord) 

358 # print(json.dumps(coord,indent=4)) 

359 

360 # Parsing the input CP2K file (if any) 

361 if self.io_dict["in"]["input_inp_path"] and self.simulation_type: 

362 print("Incompatible inputs found: simulation_type [{0}] and input_inp_path [{1}].".format(self.simulation_type, self.io_dict['in']['input_inp_path'])) 

363 print("Will take just the input_inp_path.") 

364 elif (self.simulation_type): 

365 # path_cp2k_in = PurePath(myself.__file__).parent 

366 path_cp2k_in = Path(os.getenv("CONDA_PREFIX", "")).joinpath('cp2k_aux') 

367 if (self.simulation_type == 'energy'): 

368 self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_energy.inp")) 

369 elif (self.simulation_type == 'geom_opt'): 

370 self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_geom_opt.inp")) 

371 elif (self.simulation_type == 'md'): 

372 self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_md.inp")) 

373 elif (self.simulation_type == 'mp2'): 

374 self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_mp2.inp")) 

375 else: 

376 fu.log(self.__class__.__name__ + ': ERROR: Simulation type %s not defined' % self.simulation_type, self.out_log) 

377 raise SystemExit(self.__class__.__name__ + ': ERROR: Simulation type %s not defined' % self.simulation_type) 

378 else: 

379 print("ERROR: Neither simulation type nor input_inp_path were defined.") 

380 

381 if self.io_dict["in"]["input_inp_path"]: 

382 cp2k_in_array = [] 

383 with open(self.io_dict["in"]["input_inp_path"], 'r') as cp2k_in_fh: 

384 # inp_in = self.parse(cp2k_in_fh) 

385 for line in cp2k_in_fh: 

386 cp2k_in_array.append(line) 

387 self.inp_in = self.parse_rec_def(cp2k_in_array, 0, 'Stop') 

388 # print(json.dumps(self.inp_in,indent=4)) 

389 

390 if self.io_dict["in"]["input_inp_path"] and self.cp2k_in: 

391 final_dict = self.merge(self.inp_in, self.cp2k_in) 

392 # final_dict = self.merge(self.cp2k_in,self.inp_in) 

393 # print(json.dumps(final_dict,indent=4)) 

394 elif self.io_dict["in"]["input_inp_path"] and not self.cp2k_in: 

395 final_dict = self.inp_in 

396 # print(json.dumps(final_dict,indent=4)) 

397 elif self.cp2k_in and not self.io_dict["in"]["input_inp_path"]: 

398 final_dict = self.cp2k_in 

399 # print(json.dumps(final_dict,indent=4)) 

400 else: 

401 print("HOUSTON....") 

402 

403 if self.io_dict["in"]["input_rst_path"]: 

404 # new_dict={'FORCE_EVAL':{'DFT':{'WFN_RESTART_FILE_NAME': os.path.abspath(self.io_dict["in"]["input_rst_path"]), 'SCF' : {'SCF_GUESS':'RESTART'}}}} 

405 new_dict = {'FORCE_EVAL': {'DFT': {'WFN_RESTART_FILE_NAME': Path(self.io_dict["in"]["input_rst_path"]).resolve(), 'SCF': {'SCF_GUESS': 'RESTART'}}}} 

406 self.update(final_dict, new_dict) 

407 # print(json.dumps(final_dict,indent=4)) 

408 

409 final_dict2 = final_dict 

410 if self.io_dict["in"]["input_pdb_path"]: 

411 final_dict2 = self.replace_coords(final_dict, coord) 

412 

413 # print(json.dumps(final_dict,indent=4)) 

414 

415 with open(self.io_dict["out"]["output_inp_path"], 'w') as cp2k_out_fh: 

416 self.iterdict(final_dict2, 0, cp2k_out_fh) 

417 

418 self.remove_tmp_files() 

419 

420 self.check_arguments(output_files_created=True, raise_exception=False) 

421 

422 return 0 

423 

424 

425def cp2k_prep(output_inp_path: str, 

426 input_inp_path: Optional[str] = None, input_pdb_path: Optional[str] = None, input_rst_path: Optional[str] = None, 

427 properties: Optional[dict] = None, **kwargs) -> int: 

428 """Create :class:`Cp2kPrep <cp2k.cp2k_prep.Cp2kPrep>`cp2k.cp2k_prep.Cp2kPrep class and 

429 execute :meth:`launch() <cp2k.cp2k_prep.Cp2kPrep.launch>` method""" 

430 return Cp2kPrep(**dict(locals())).launch() 

431 

432 

433cp2k_prep.__doc__ = Cp2kPrep.__doc__ 

434main = Cp2kPrep.get_main(cp2k_prep, "Prepares input files for the CP2K QM tool.") 

435 

436if __name__ == '__main__': 

437 main()