Coverage for biobb_cp2k/cp2k/cp2k_prep.py: 70%
285 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-19 13:23 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-19 13:23 +0000
1#!/usr/bin/env python3
3"""Module containing the Cp2kPrep class and the command line interface."""
4import argparse
5from typing import Optional
6from typing import Any
7import os
8import collections.abc
9from pathlib import Path
10from biobb_common.generic.biobb_object import BiobbObject
11from biobb_common.configuration import settings
12from biobb_common.tools import file_utils as fu
13from biobb_common.tools.file_utils import launchlogger
14from biobb_cp2k.cp2k.common import check_input_path, check_output_path
17class Cp2kPrep(BiobbObject):
18 """
19 | biobb_cp2k Cp2kPrep
20 | Helper bb to prepare inputs for the `CP2K QM tool <https://www.cp2k.org/>`_ module.
21 | Prepares input files for the CP2K QM tool.
23 Args:
24 input_inp_path (str) (Optional): Input configuration file (CP2K run options). File type: input. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/data/cp2k/cp2k_energy.inp>`_. Accepted formats: pdb (edam:format_1476).
25 input_pdb_path (str) (Optional): Input PDB file. File type: input. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/data/cp2k/H2O_box.pdb>`_. Accepted formats: pdb (edam:format_1476).
26 input_rst_path (str) (Optional): Input restart file (WFN). File type: input. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/data/cp2k/cp2k.wfn>`_. Accepted formats: wfn (edam:format_2333).
27 output_inp_path (str): Output CP2K input configuration file. File type: output. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/reference/cp2k/cp2k_prep_out.inp>`_. Accepted formats: inp (edam:format_2330), in (edam:format_2330), txt (edam:format_2330).
28 properties (dict - Python dictionary object containing the tool parameters, not input/output files):
29 * **simulation_type** (*str*) - ("energy") Default options for the cp2k_in file. Each creates a different inp file. Values: `energy <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_energy.inp>`_ (Computes Energy and Forces), `geom_opt <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_geom_opt.inp>`_ (Runs a geometry optimization), `md <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_md.inp>`_ (Runs an MD calculation), `mp2 <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_mp2.inp>`_ (Runs an MP2 calculation).
30 * **cp2k_in** (*dict*) - ({}) CP2K run options specification.
31 * **cell_cutoff** (*float*) - (5.0) CP2K cell cutoff, to build the cell around the system (only used if input_pdb_path is defined).
32 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
33 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
34 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
36 Examples:
37 This is a use example of how to use the building block from Python::
39 from biobb_cp2k.cp2k.cp2k_prep import cp2k_prep
40 prop = {
41 'simulation_type': 'geom_opt'
42 }
43 cp2k_prep(input_pdb_path='/path/to/input.pdb',
44 input_inp_path='/path/to/cp2k_in.inp',
45 output_inp_path='/path/to/cp2k_out.inp',
46 properties=prop)
48 Info:
49 * wrapped_software:
50 * name: In house
51 * license: Apache-2.0
52 * ontology:
53 * name: EDAM
54 * schema: http://edamontology.org/EDAM.owl
56 """
58 def __init__(self, output_inp_path: str,
59 input_pdb_path: Optional[str] = None, input_inp_path: Optional[str] = None, input_rst_path: Optional[str] = None,
60 properties: Optional[dict] = None, **kwargs) -> None:
62 properties = properties or {}
64 # Call parent class constructor
65 super().__init__(properties)
66 self.locals_var_dict = locals().copy()
68 # Input/Output files
69 self.io_dict = {
70 'in': {'input_pdb_path': input_pdb_path,
71 'input_inp_path': input_inp_path,
72 'input_rst_path': input_rst_path},
73 'out': {'output_inp_path': output_inp_path}
74 }
76 # Properties specific for BB
77 self.properties = properties
78 self.simulation_type = properties.get('simulation_type', "energy")
79 self.cell_cutoff = properties.get('cell_cutoff', 5.0)
80 self.cp2k_in = properties.get('cp2k_in', dict())
81 # self.cp2k_in = {k: str(v) for k, v in properties.get('cp2k_in', dict()).items()}
83 # Check the properties
84 self.check_properties(properties)
85 self.check_arguments()
87 def iterdict(self, d, depth, fileout_h):
88 for k, v in d.items():
89 if k.upper() == "FORCE_EVAL" or k.upper() == "MOTION":
90 depth = 0
91 elif "-" in k:
92 k = k.split("-")[0]
93 if isinstance(v, dict):
94 depth = depth+1
95 if 'name' in v.keys():
96 print(' ' * depth + "&" + k.upper(), v['name'], file=fileout_h)
97 else:
98 print(' ' * depth + "&" + k.upper(), file=fileout_h)
99 self.iterdict(v, depth, fileout_h)
100 print(' ' * depth + "&END", k.upper(), file=fileout_h)
101 depth = depth-1
102 else:
103 if k.isnumeric():
104 print(' ' * depth, v, file=fileout_h)
105 elif isinstance(v, list):
106 if not isinstance(v[0], dict):
107 print(' ' * depth, k, ' '.join(v), file=fileout_h)
108 elif isinstance(v[0], dict):
109 depth = depth+1
110 if k.upper() == 'KIND':
111 for atom in v:
112 print(' ' * depth + "&" + k.upper(), atom['name'], file=fileout_h)
113 self.iterdict(atom, depth, fileout_h)
114 print(' ' * depth + "&END", k.upper(), file=fileout_h)
116 elif k.upper() == 'COORD':
117 print(' ' * depth + "&" + k.upper(), file=fileout_h)
118 for atom in v:
119 self.iterdict(atom, depth, fileout_h)
120 else:
121 print(' ' * depth + "&" + k.upper(), file=fileout_h)
123 if k.upper() != 'KIND':
124 print(' ' * depth + "&END", k.upper(), file=fileout_h)
125 depth = depth-1
127 elif k != 'name':
128 print(' ' * depth, k.upper(), v, file=fileout_h)
130 # global dict3 = {}
131 def parse_rec_def(self, cp2k_in_array: list[Any], index: int, stop: str) -> dict[Any, Any]:
132 dict_var: dict[Any, Any] = {}
133 dict_var2: dict[Any, Any] = {}
134 depth = 0
135 rec = False
136 for line in cp2k_in_array[index:]:
137 index = index+1
138 if line.startswith('#') or not line.strip():
139 continue
141 if 'END' in line:
142 depth = depth - 1
143 vals = line.lstrip().split()
145 if depth < 0:
146 return dict_var
147 elif '&' in line:
148 depth = depth + 1
149 if depth == 1:
150 vals = line.lstrip().split()
151 key = vals[0].replace('&', '')
152 if (key == 'KIND'):
153 key_name = key + "-" + vals[1]
154 if dict_var.get(key):
155 dict_var[key].append(self.parse_rec_def(cp2k_in_array, index, key_name))
156 else:
157 dict_var[key] = []
158 dict_var[key].append(self.parse_rec_def(cp2k_in_array, index, key_name))
159 else:
160 rec = True
161 dict_var[key] = self.parse_rec_def(cp2k_in_array, index, key)
162 if len(vals) > 1 and key != 'KIND':
163 # print(stop + " Add dict_var[key]['name'] = " + str(vals[1].strip()))
164 dict_var[key]['name'] = vals[1].strip()
166 elif not rec:
167 vals = line.lstrip().split()
168 # print(stop + " Add dict_var[" + str(vals[0]) + "] = " + str(vals[1].strip()))
169 if (stop == 'COORD'):
170 if dict_var2.get('coords_list'):
171 dict_var2['coords_list'].append({vals[0]: vals[1:]})
172 else:
173 dict_var2['coords_list'] = []
174 dict_var2['coords_list'].append({vals[0]: vals[1:]})
176 dict_var = dict_var2['coords_list'] # type: ignore
177 elif (len(vals) == 2):
178 if (stop.startswith('KIND-')):
179 key2, name = stop.split('-')
180 dict_var['name'] = name
181 dict_var[vals[0]] = vals[1].strip()
182 else:
183 dict_var[vals[0]] = vals[1:]
185 return dict_var
187 def parse_pdb(self, pdb_file):
188 dict_var = {}
189 # coord = {}
190 coord = []
191 cell = {}
192 max_x = -999.999
193 max_y = -999.999
194 max_z = -999.999
195 min_x = 999.999
196 min_y = 999.999
197 min_z = 999.999
198 for line in open(pdb_file):
199 # ATOM 2 C7 JZ4 1 21.520 -27.270 -4.230 1.00 0.00
200 if line[0:4] == 'ATOM' or line[0:6] == 'HETATM':
201 # atom = line[12:16]
202 elem = line[77]
203 x = line[30:38]
204 y = line[38:46]
205 z = line[46:54]
206 if (float(x) > float(max_x)):
207 max_x = x
208 if (float(y) > float(max_y)):
209 max_y = y
210 if (float(z) > float(max_z)):
211 max_z = z
212 if (float(x) < float(min_x)):
213 min_x = x
214 if (float(y) < float(min_y)):
215 min_y = y
216 if (float(z) < float(min_z)):
217 min_z = z
218 # coord[elem] = [x,y,z]
219 # lcoord = []
220 coord.append({elem: [x, y, z]})
221 # coord[elem] = lcoord
223 box_x = float(max_x) - float(min_x)
224 box_y = float(max_y) - float(min_y)
225 box_z = float(max_z) - float(min_z)
227 box_x = float(f'{box_x:.3f}')
228 box_y = float(f'{box_y:.3f}')
229 box_z = float(f'{box_z:.3f}')
231 box_x = box_x + self.cell_cutoff
232 box_y = box_y + self.cell_cutoff
233 box_z = box_z + self.cell_cutoff
235 # cell['A'] = [str(box_x),'0.000','0.000']
236 # cell['B'] = ['0.000',str(box_y),'0.000']
237 # cell['C'] = ['0.000','0.000',str(box_z)]
239 cell['ABC'] = [str(box_x), str(box_y), str(box_z)]
241 dict_var['coord'] = coord
242 # dict_var['coords'] = coords
243 dict_var['cell'] = cell
245 return dict_var
247 def merge(self, a, b):
248 for key_b in b:
249 key_bu = key_b.upper()
250 if key_bu in (key_a.upper() for key_a in a):
251 for key_a in a:
252 key_au = key_a.upper()
253 if "-" in key_au:
254 key_au = key_au.split("-")[0]
255 if key_au == key_bu:
256 if isinstance(a[key_a], dict) and isinstance(b[key_b], dict):
257 self.merge(a[key_a], b[key_b])
258 elif isinstance(a[key_a], list) and isinstance(b[key_b], list):
259 if (key_au == 'KIND'):
260 for idxB, elemB in enumerate(b[key_b]):
261 done = False
262 for idxA, elemA in enumerate(a[key_a]):
263 if elemB['name'] == elemA['name']:
264 done = True
265 self.merge(a[key_a][idxA], b[key_b][idxB])
266 if not done:
267 a[key_a].append(b[key_b][idxB])
268 elif a[key_a] == b[key_b]:
269 pass # same leaf value
270 else:
271 a[key_a] = b[key_b]
272 else:
273 a[key_b] = b[key_b]
274 return a
276 def replace_coords(self, a, b):
277 # dict_var['force_eval'] = {'subsys' : {'coord' : coord } }
278 print("BioBB_CP2K, replacing coordinates...")
279 for key in a:
280 if key.upper() == 'FORCE_EVAL':
281 for key_2 in a[key]:
282 if key_2.upper() == 'SUBSYS':
283 if 'coord' in a[key][key_2]:
284 a[key][key_2]['coord'] = b['coord']
285 elif 'Coord' in a[key][key_2]:
286 a[key][key_2]['Coord'] = b['coord']
287 elif 'COORD' in a[key][key_2]:
288 a[key][key_2]['COORD'] = b['coord']
289 else:
290 a[key][key_2]['coord'] = b['coord']
292 if 'cell' in a[key][key_2]:
293 if 'ABC' in a[key][key_2]['cell']:
294 a[key][key_2]['cell']['ABC'] = b['cell']['ABC']
295 elif 'abc' in a[key][key_2]['cell']:
296 a[key][key_2]['cell']['abc'] = b['cell']['ABC']
297 elif 'Abc' in a[key][key_2]['cell']:
298 a[key][key_2]['cell']['Abc'] = b['cell']['ABC']
299 else:
300 a[key][key_2]['cell']['abc'] = b['cell']['ABC']
301 elif 'Cell' in a[key][key_2]:
302 if 'ABC' in a[key][key_2]['Cell']:
303 a[key][key_2]['Cell']['ABC'] = b['cell']['ABC']
304 elif 'abc' in a[key][key_2]['Cell']:
305 a[key][key_2]['Cell']['abc'] = b['cell']['ABC']
306 elif 'Abc' in a[key][key_2]['Cell']:
307 a[key][key_2]['Cell']['Abc'] = b['cell']['ABC']
308 else:
309 a[key][key_2]['Cell']['abc'] = b['cell']['ABC']
310 elif 'CELL' in a[key][key_2]:
311 if 'ABC' in a[key][key_2]['CELL']:
312 a[key][key_2]['CELL']['ABC'] = b['cell']['ABC']
313 elif 'abc' in a[key][key_2]['CELL']:
314 a[key][key_2]['CELL']['abc'] = b['cell']['ABC']
315 elif 'Abc' in a[key][key_2]['CELL']:
316 a[key][key_2]['CELL']['Abc'] = b['cell']['ABC']
317 else:
318 a[key][key_2]['CELL']['abc'] = b['cell']['ABC']
319 else:
320 a[key][key_2]['cell'] = b['cell']
321 return a
323 def check_data_params(self, out_log, out_err):
324 """ Checks input/output paths correctness """
326 # Check input(s)
327 self.io_dict["in"]["input_inp_path"] = check_input_path(self.io_dict["in"]["input_inp_path"], "input_inp_path", True, out_log, self.__class__.__name__)
328 self.io_dict["in"]["input_pdb_path"] = check_input_path(self.io_dict["in"]["input_pdb_path"], "input_pdb_path", True, out_log, self.__class__.__name__)
329 self.io_dict["in"]["input_rst_path"] = check_input_path(self.io_dict["in"]["input_rst_path"], "input_rst_path", True, out_log, self.__class__.__name__)
331 # Check output(s)
332 self.io_dict["out"]["output_inp_path"] = check_output_path(self.io_dict["out"]["output_inp_path"], "output_inp_path", False, out_log, self.__class__.__name__)
334 def update(self, d, u):
335 for k, v in u.items():
336 if isinstance(v, collections.abc.Mapping):
337 d[k] = self.update(d.get(k, {}), v)
338 else:
339 d[k] = v
340 return d
342 @launchlogger
343 def launch(self):
344 """Launches the execution of the Cp2kPrep module."""
346 # check input/output paths and parameters
347 self.check_data_params(self.out_log, self.err_log)
349 # Setup Biobb
350 if self.check_restart():
351 return 0
352 self.stage_files()
354 # Generating inp file
356 # Parsing the input PDB file (if any)
357 if self.io_dict["in"]["input_pdb_path"]:
358 coord = self.parse_pdb(self.io_dict["in"]["input_pdb_path"])
359 # print(coord)
360 # print(json.dumps(coord,indent=4))
362 # Parsing the input CP2K file (if any)
363 if self.io_dict["in"]["input_inp_path"] and self.simulation_type:
364 print("Incompatible inputs found: simulation_type [{0}] and input_inp_path [{1}].".format(self.simulation_type, self.io_dict['in']['input_inp_path']))
365 print("Will take just the input_inp_path.")
366 elif (self.simulation_type):
367 # path_cp2k_in = PurePath(myself.__file__).parent
368 path_cp2k_in = Path(os.getenv("CONDA_PREFIX", "")).joinpath('cp2k_aux')
369 if (self.simulation_type == 'energy'):
370 self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_energy.inp"))
371 elif (self.simulation_type == 'geom_opt'):
372 self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_geom_opt.inp"))
373 elif (self.simulation_type == 'md'):
374 self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_md.inp"))
375 elif (self.simulation_type == 'mp2'):
376 self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_mp2.inp"))
377 else:
378 fu.log(self.__class__.__name__ + ': ERROR: Simulation type %s not defined' % self.simulation_type, self.out_log)
379 raise SystemExit(self.__class__.__name__ + ': ERROR: Simulation type %s not defined' % self.simulation_type)
380 else:
381 print("ERROR: Neither simulation type nor input_inp_path were defined.")
383 if self.io_dict["in"]["input_inp_path"]:
384 cp2k_in_array = []
385 with open(self.io_dict["in"]["input_inp_path"], 'r') as cp2k_in_fh:
386 # inp_in = self.parse(cp2k_in_fh)
387 for line in cp2k_in_fh:
388 cp2k_in_array.append(line)
389 self.inp_in = self.parse_rec_def(cp2k_in_array, 0, 'Stop')
390 # print(json.dumps(self.inp_in,indent=4))
392 if self.io_dict["in"]["input_inp_path"] and self.cp2k_in:
393 final_dict = self.merge(self.inp_in, self.cp2k_in)
394 # final_dict = self.merge(self.cp2k_in,self.inp_in)
395 # print(json.dumps(final_dict,indent=4))
396 elif self.io_dict["in"]["input_inp_path"] and not self.cp2k_in:
397 final_dict = self.inp_in
398 # print(json.dumps(final_dict,indent=4))
399 elif self.cp2k_in and not self.io_dict["in"]["input_inp_path"]:
400 final_dict = self.cp2k_in
401 # print(json.dumps(final_dict,indent=4))
402 else:
403 print("HOUSTON....")
405 if self.io_dict["in"]["input_rst_path"]:
406 # new_dict={'FORCE_EVAL':{'DFT':{'WFN_RESTART_FILE_NAME': os.path.abspath(self.io_dict["in"]["input_rst_path"]), 'SCF' : {'SCF_GUESS':'RESTART'}}}}
407 new_dict = {'FORCE_EVAL': {'DFT': {'WFN_RESTART_FILE_NAME': Path(self.io_dict["in"]["input_rst_path"]).resolve(), 'SCF': {'SCF_GUESS': 'RESTART'}}}}
408 self.update(final_dict, new_dict)
409 # print(json.dumps(final_dict,indent=4))
411 final_dict2 = final_dict
412 if self.io_dict["in"]["input_pdb_path"]:
413 final_dict2 = self.replace_coords(final_dict, coord)
415 # print(json.dumps(final_dict,indent=4))
417 with open(self.io_dict["out"]["output_inp_path"], 'w') as cp2k_out_fh:
418 self.iterdict(final_dict2, 0, cp2k_out_fh)
420 # self.tmp_files.extend([
421 # self.stage_io_dict.get("unique_dir", "")
422 # ])
423 self.remove_tmp_files()
425 self.check_arguments(output_files_created=True, raise_exception=False)
427 return 0
430def cp2k_prep(output_inp_path: str,
431 input_inp_path: Optional[str] = None, input_pdb_path: Optional[str] = None, input_rst_path: Optional[str] = None,
432 properties: Optional[dict] = None, **kwargs) -> int:
433 """Create :class:`Cp2kPrep <cp2k.cp2k_prep.Cp2kPrep>`cp2k.cp2k_prep.Cp2kPrep class and
434 execute :meth:`launch() <cp2k.cp2k_prep.Cp2kPrep.launch>` method"""
436 return Cp2kPrep(input_inp_path=input_inp_path,
437 input_pdb_path=input_pdb_path,
438 input_rst_path=input_rst_path,
439 output_inp_path=output_inp_path,
440 properties=properties).launch()
442 cp2k_prep.__doc__ = Cp2kPrep.__doc__
445def main():
446 parser = argparse.ArgumentParser(description='Prepares input files for the CP2K QM tool.', formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999))
447 parser.add_argument('--config', required=False, help='Configuration file')
449 # Specific args
450 required_args = parser.add_argument_group('required arguments')
451 required_args.add_argument('--output_inp_path', required=True, help='Output CP2K input inp file. Accepted formats: inp, in, txt.')
452 parser.add_argument('--input_inp_path', required=False, help='Input configuration file (QM options) (CP2K inp). Accepted formats: inp, in, txt.')
453 parser.add_argument('--input_pdb_path', required=False, help='Input PDB file. Accepted formats: pdb.')
454 parser.add_argument('--input_rst_path', required=False, help='Input Restart file (WFN). Accepted formats: wfn.')
456 args = parser.parse_args()
457 # config = args.config if args.config else None
458 args.config = args.config or "{}"
459 # properties = settings.ConfReader(config=config).get_prop_dic()
460 properties = settings.ConfReader(config=args.config).get_prop_dic()
462 # Specific call
463 cp2k_prep(input_inp_path=args.input_inp_path,
464 input_pdb_path=args.input_pdb_path,
465 input_rst_path=args.input_rst_path,
466 output_inp_path=args.output_inp_path,
467 properties=properties)
470if __name__ == '__main__':
471 main()