Coverage for biobb_analysis/ambertools/common.py: 65%
219 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 14:38 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 14:38 +0000
1""" Common functions for package biobb_analysis.ambertools """
2from pathlib import Path, PurePath
3import zipfile
4import shutil
5from biobb_common.tools import file_utils as fu
8def check_top_path(path, out_log, classname):
9 """ Checks topology input file """
10 orig_path = path
11 if not Path(path).exists():
12 fu.log(classname + ': Unexisting topology input file, exiting', out_log)
13 raise SystemExit(classname + ': Unexisting topology input file')
14 file_extension = PurePath(path).suffix
15 if not is_valid_topology(file_extension[1:]):
16 fu.log(classname + ': Format %s in topology input file is not compatible' % file_extension[1:], out_log)
17 raise SystemExit(classname + ': Format %s in topology input file is not compatible' % file_extension[1:])
18 if zipfile.is_zipfile(path):
19 top_file = fu.unzip_top(zip_file=path, out_log=out_log)
20 path = top_file
21 return path, orig_path
24def check_traj_path(path, out_log, classname):
25 """ Checks trajectory input file """
26 if not Path(path).exists():
27 fu.log(classname + ': Unexisting trajectory input file, exiting', out_log)
28 raise SystemExit(classname + ': Unexisting trajectory input file')
29 file_extension = PurePath(path).suffix
30 if not is_valid_trajectory(file_extension[1:]):
31 fu.log(classname + ': Format %s in trajectory input file is not compatible' % file_extension[1:], out_log)
32 raise SystemExit(classname + ': Format %s in trajectory input file is not compatible' % file_extension[1:])
33 return path
36def check_out_path(path, out_log, classname):
37 """ Checks if output folder exists """
38 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
39 fu.log(classname + ': Unexisting output folder, exiting', out_log)
40 raise SystemExit(classname + ': Unexisting output folder')
41 return path
44def get_parameters(properties, type, classname, out_log):
45 """ Gets in_parameters and out_parameters """
46 if not properties.get(type, dict()):
47 fu.log('No %s parameters provided' % type, out_log)
48 return get_default_value(classname)[type]
50 return {k: v for k, v in properties.get(type, dict()).items()}
53def get_binary_path(properties, type):
54 """ Gets binary path """
55 return properties.get(type, get_default_value(type))
58def check_in_path(path, out_log, classname):
59 """ Checks input instructions file """
60 if not Path(path).exists():
61 fu.log(classname + ': Unexisting input instructions file, exiting', out_log)
62 raise SystemExit(classname + ': Unexisting input instructions file')
64 # check syntax for instructions file
65 syntax_instructions = True
66 err_instructions = ''
67 if 'parm ' not in open(path).read():
68 syntax_instructions = False
69 err_instructions += 'No topology provided'
70 if 'trajin ' not in open(path).read():
71 syntax_instructions = False
72 err_instructions += ' No input trajectory provided'
73 if not syntax_instructions:
74 fu.log(classname + ': Incorrect syntax for instructions file: %s, exiting' % err_instructions, out_log)
75 raise SystemExit(classname + ': Incorrect syntax for instructions file: %s, exiting' % err_instructions)
78def get_default_value(key):
79 """ Gives default values according to the given key """
80 default_values = {
81 "start": 1,
82 "end": -1,
83 "step": 1,
84 "snapshot": 1,
85 "format": "netcdf",
86 "mask": "all-atoms",
87 "reference": "first",
88 "average": "MyAvg",
89 "instructions_file": "instructions.in",
90 "binary_path": "cpptraj",
91 # default conf for Average
92 "Average": {
93 "in_parameters": {
94 "start": 1,
95 "end": -1,
96 "step": 1,
97 "mask": "all-atoms"
98 },
99 "out_parameters": {
100 "format": "pdb"
101 }
102 },
103 # default conf for Bfactor
104 "Bfactor": {
105 "in_parameters": {
106 "start": 1,
107 "end": -1,
108 "step": 1,
109 "mask": "all-atoms",
110 "reference": "first"
111 }
112 },
113 # default conf for Convert
114 "Convert": {
115 "in_parameters": {
116 "start": 1,
117 "end": -1,
118 "step": 1,
119 "mask": "all-atoms"
120 },
121 "out_parameters": {
122 "format": "netcdf"
123 }
124 },
125 # default conf for Dry
126 "Dry": {
127 "in_parameters": {
128 "start": 1,
129 "end": -1,
130 "step": 1,
131 "mask": "all-atoms"
132 },
133 "out_parameters": {
134 "format": "netcdf"
135 }
136 },
137 # default conf for Image
138 "Image": {
139 "in_parameters": {
140 "start": 1,
141 "end": -1,
142 "step": 1,
143 "mask": "all-atoms"
144 },
145 "out_parameters": {
146 "format": "netcdf"
147 }
148 },
149 # default conf for Mask
150 "Mask": {
151 "in_parameters": {
152 "start": 1,
153 "end": -1,
154 "step": 1,
155 "mask": "all-atoms"
156 },
157 "out_parameters": {
158 "format": "netcdf"
159 }
160 },
161 # default conf for Rgyr
162 "Rgyr": {
163 "in_parameters": {
164 "start": 1,
165 "end": -1,
166 "step": 1,
167 "mask": "all-atoms"
168 }
169 },
170 # default conf for Rms
171 "Rms": {
172 "in_parameters": {
173 "start": 1,
174 "end": -1,
175 "step": 1,
176 "mask": "all-atoms",
177 "reference": "first"
178 }
179 },
180 # default conf for Rmsf
181 "Rmsf": {
182 "in_parameters": {
183 "start": 1,
184 "end": -1,
185 "step": 1,
186 "mask": "all-atoms",
187 "reference": "first"
188 }
189 },
190 # default conf for Slice
191 "Slice": {
192 "in_parameters": {
193 "start": 1,
194 "end": -1,
195 "step": 1,
196 "mask": "all-atoms"
197 },
198 "out_parameters": {
199 "format": "netcdf"
200 }
201 },
202 # default conf for Snapshot
203 "Snapshot": {
204 "in_parameters": {
205 "snapshot": 12,
206 "mask": "all-atoms"
207 },
208 "out_parameters": {
209 "format": "pdb"
210 }
211 },
212 # default conf for Strip
213 "Strip": {
214 "in_parameters": {
215 "start": 1,
216 "end": -1,
217 "step": 1,
218 "mask": "all-atoms"
219 },
220 "out_parameters": {
221 "format": "netcdf"
222 }
223 }
224 }
226 return default_values[key]
229def is_valid_topology(ext):
230 """ Checks if trajectory format is compatible with Cpptraj """
231 formats = 'top', 'pdb', 'prmtop', 'parmtop', 'zip'
232 return ext in formats
235def is_valid_trajectory(traj):
236 """ Checks if trajectory format is compatible with Cpptraj """
237 formats = 'mdcrd', 'crd', 'cdf', 'netcdf', 'nc', 'restart', 'ncrestart', 'restartnc', 'dcd', 'charmm', 'cor', 'pdb', 'mol2', 'trr', 'gro', 'binpos', 'xtc', 'cif', 'arc', 'sqm', 'sdf', 'conflib'
238 return traj in formats
241def is_valid_reference(ref):
242 """ Checks if reference is correct """
243 references = 'first', 'average', 'experimental'
244 return ref in references
247def get_mask_atoms(key):
248 """ Gives mask atoms according to the given key """
249 masks = {
250 "c-alpha": "@CA",
251 "backbone": "@C,CA,N,O,C3',O3',C4',C5',O5',P",
252 "all-atoms": ":*",
253 "heavy-atoms": "!@H*,1H*,2H*,3H*",
254 "side-chain": "!@CA,C,N,O,H,HA,C3',O3',C4',C5',O5',P",
255 "solute": "!:WAT,HOH,SOL,TIP3,TP3,SOD,CLA,Na+,Cl-,NA,CL,K+,K",
256 "ions": ":SOD,CLA,Na+,Cl-,NA,CL,K+,K",
257 "solvent": ":WAT,HOH,SOL,TIP3,TP3"
258 }
260 # if key incorrect, return default value and message
261 if key in masks:
262 return masks[key], None
263 else:
264 return key, None # Allow for Amber mask
267def get_in_parameters(list, out_log, type='None'):
268 """ Return string with input parameters """
269 # if strip or mask, no mandatory trajin parameters
270 if type == 'strip' or type == 'mask':
271 start = '' if 'start' not in list else str(list['start'])
272 end = '' if 'end' not in list else str(list['end'])
273 step = '' if 'step' not in list else str(list['step'])
274 if (not start or start == 'None') and (not end or end == 'None') and (not step or step == 'None'):
275 return ''
276 else:
277 if not start:
278 start = str(get_default_value("start"))
279 fu.log('No start value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('start'), out_log)
280 if not end:
281 end = str(get_default_value("end"))
282 fu.log('No end value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('end'), out_log)
283 if not step:
284 step = str(get_default_value("step"))
285 fu.log('No step value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('step'), out_log)
286 else:
287 # check if trajin parameters are provided and have correct format
288 if (type == 'snapshot'):
289 snapshot = str(get_default_value("snapshot")) if 'snapshot' not in list else str(list['snapshot'])
290 if 'snapshot' not in list or not list['snapshot'] or not isinstance(list['snapshot'], int):
291 snapshot = str(get_default_value("snapshot"))
292 fu.log('No snapshot value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('snapshot'), out_log)
293 start = snapshot
294 end = snapshot
295 step = '1'
296 else:
297 start = str(get_default_value("start")) if 'start' not in list else str(list['start'])
298 if 'start' not in list or not list['start'] or not isinstance(list['start'], int):
299 start = str(get_default_value("start"))
300 fu.log('No start value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('start'), out_log)
301 end = str(get_default_value("end")) if 'end' not in list else str(list['end'])
302 if 'end' not in list or not list['end'] or not isinstance(list['end'], int):
303 end = str(get_default_value("end"))
304 fu.log('No end value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('end'), out_log)
305 step = str(get_default_value("step")) if 'step' not in list else str(list['step'])
306 if 'step' not in list or not list['step'] or not isinstance(list['step'], int):
307 step = str(get_default_value("step"))
308 fu.log('No step value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('step'), out_log)
310 # checking start <= end
311 if end != '-1' and start > end:
312 fu.log('End must be -1 (indicating the end of the trajectory) or more or equal than start. Your values are start: %s, end: %s' % (start, end), out_log)
313 raise SystemExit('End must be -1 (indicating the end of the trajectory) or greater or equal than start. Your values are start: %s, end: %s' % (start, end))
315 return start + " " + end + " " + step
318def setup_structure(out_log):
319 """ Sets up the structure """
320 instructions_list = []
321 mask_atoms = get_mask('heavy-atoms', out_log)
322 instructions_list.append('center ' + mask_atoms + ' origin')
323 instructions_list.append('autoimage')
324 instructions_list.append('rms first ' + mask_atoms)
325 mask_solvent = get_mask('solvent', out_log)
326 mask_ions = get_mask('ions', out_log)
327 instructions_list.append('strip ' + mask_solvent + ',' + mask_ions[1:])
329 return instructions_list
332def get_negative_mask(key, out_log):
333 """ Gives the negative mask according to the given key """
334 atoms, msg = get_mask_atoms(key)
335 if atoms[0] == '!':
336 mask = atoms[1:]
337 else:
338 mask = '!' + atoms
339 # if mask incorrect, give message
340 if msg:
341 fu.log(msg, out_log)
343 return mask
346def get_mask(key, out_log):
347 """ Gives mask according to the given key """
348 mask, msg = get_mask_atoms(key)
349 # if mask incorrect, give message
350 if msg:
351 fu.log(msg, out_log)
353 return mask
356def get_reference(ref, output_cpptraj_path, input_exp_path, mask, output, classname, out_log):
357 """ Gives reference instructions according to the given key """
358 instructions_list = []
359 if not ref or ref == 'None':
360 ref = get_default_value('reference')
361 fu.log('No reference provided in configuration file, assigned default value: %s' % get_default_value('reference'), out_log)
363 if not is_valid_reference(ref):
364 fu.log('Reference %s is not compatible, assigned default value: %s' % (ref, get_default_value('reference')), out_log)
365 ref = get_default_value('reference')
367 if ref == 'first':
368 if output:
369 instructions_list.append('rms first out ' + output_cpptraj_path)
370 else:
371 instructions_list.append('rms first')
373 if ref == 'average':
374 instructions_list.append('average crdset ' + get_default_value('average'))
375 instructions_list.append('run')
376 if output:
377 instructions_list.append('rms ref ' + get_default_value('average') + ' ' + mask + ' out ' + output_cpptraj_path)
378 else:
379 instructions_list.append('rms ref ' + get_default_value('average') + ' ' + mask)
381 if ref == 'experimental':
382 if not input_exp_path:
383 fu.log('No experimental structure provided, exiting', out_log)
384 raise SystemExit(classname + ': input_exp_path is mandatory')
385 instructions_list.append('parm ' + input_exp_path + ' noconect [exp]')
386 solute, msg = get_mask_atoms('solute')
387 instructions_list.append('reference ' + input_exp_path + ' ' + solute + ' parm [exp]')
388 backbone, msg = get_mask_atoms('backbone')
389 if output:
390 instructions_list.append('rms reference ' + mask + ' out ' + output_cpptraj_path)
391 else:
392 instructions_list.append('rms reference ' + mask)
394 return instructions_list
397def get_reference_rms(ref, output_cpptraj_path, input_exp_path, mask, output, classname, out_log, nofit=False, norotate=False, nomod=False):
398 """ Gives reference instructions according to the given key """
399 instructions_list = []
400 if not ref or ref == 'None':
401 ref = get_default_value('reference')
402 fu.log('No reference provided in configuration file, assigned default value: %s' % get_default_value('reference'), out_log)
404 if not is_valid_reference(ref):
405 fu.log('Reference %s is not compatible, assigned default value: %s' % (ref, get_default_value('reference')), out_log)
406 ref = get_default_value('reference')
408 flags = []
409 if nofit:
410 flags.append("nofit")
411 if norotate:
412 flags.append("norotate")
413 if nomod:
414 flags.append("nomod")
416 flags_str = " ".join(flags)
418 if ref == 'first':
419 if output:
420 instructions_list.append('rms first out ' + output_cpptraj_path + f' {flags_str}')
421 else:
422 instructions_list.append('rms first' + f' {flags_str}')
424 if ref == 'average':
425 instructions_list.append('average crdset ' + get_default_value('average'))
426 instructions_list.append('run')
427 if output:
428 instructions_list.append('rms ref ' + get_default_value('average') + ' ' + mask + ' out ' + output_cpptraj_path + f' {flags_str}')
429 else:
430 instructions_list.append('rms ref ' + get_default_value('average') + ' ' + mask + f' {flags_str}')
432 if ref == 'experimental':
433 if not input_exp_path:
434 fu.log('No experimental structure provided, exiting', out_log)
435 raise SystemExit(classname + ': input_exp_path is mandatory')
436 instructions_list.append('parm ' + input_exp_path + ' noconect [exp]')
437 solute, msg = get_mask_atoms('solute')
438 instructions_list.append('reference ' + input_exp_path + ' ' + solute + ' parm [exp]')
439 backbone, msg = get_mask_atoms('backbone')
440 if output:
441 instructions_list.append('rms reference ' + mask + ' out ' + output_cpptraj_path + f' {flags_str}')
442 else:
443 instructions_list.append('rms reference ' + mask + f' {flags_str}')
445 return instructions_list
448def get_out_parameters(list, out_log):
449 """ Return string with output parameters """
450 format = list['format']
451 # msg = None
452 # check if format provided
453 if not format:
454 format = get_default_value('format')
455 fu.log('No format provided in configuration file, assigned default value: %s' % get_default_value('format'), out_log)
456 # check if valid format
457 if not is_valid_trajectory(format):
458 fu.log('Format %s is not compatible, assigned default value: %s' % (format, get_default_value('format')), out_log)
459 format = get_default_value('format')
461 return format
464def copy_instructions_file_to_container(instructions_file, unique_dir):
465 shutil.copy2(instructions_file, unique_dir)
468def remove_tmp_files(list, remove_tmp, out_log, input_top_path_orig=None, input_top_path=None):
469 """ Removes temporal files generated by the wrapper """
470 tmp_files = list
471 if zipfile.is_zipfile(str(input_top_path_orig)):
472 tmp_files.append(PurePath(str(input_top_path)).parent)
474 if remove_tmp:
475 removed_files = [f for f in tmp_files if fu.rm(f)]
476 fu.log('Removed: %s' % str(removed_files), out_log)