Coverage for biobb_analysis/ambertools/common.py: 65%

219 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-06 15:22 +0000

1""" Common functions for package biobb_analysis.ambertools """ 

2from pathlib import Path, PurePath 

3import zipfile 

4import shutil 

5from biobb_common.tools import file_utils as fu 

6 

7 

8def check_top_path(path, out_log, classname): 

9 """ Checks topology input file """ 

10 orig_path = path 

11 if not Path(path).exists(): 

12 fu.log(classname + ': Unexisting topology input file, exiting', out_log) 

13 raise SystemExit(classname + ': Unexisting topology input file') 

14 file_extension = PurePath(path).suffix 

15 if not is_valid_topology(file_extension[1:]): 

16 fu.log(classname + ': Format %s in topology input file is not compatible' % file_extension[1:], out_log) 

17 raise SystemExit(classname + ': Format %s in topology input file is not compatible' % file_extension[1:]) 

18 if zipfile.is_zipfile(path): 

19 top_file = fu.unzip_top(zip_file=path, out_log=out_log) 

20 path = top_file 

21 return path, orig_path 

22 

23 

24def check_traj_path(path, out_log, classname): 

25 """ Checks trajectory input file """ 

26 if not Path(path).exists(): 

27 fu.log(classname + ': Unexisting trajectory input file, exiting', out_log) 

28 raise SystemExit(classname + ': Unexisting trajectory input file') 

29 file_extension = PurePath(path).suffix 

30 if not is_valid_trajectory(file_extension[1:]): 

31 fu.log(classname + ': Format %s in trajectory input file is not compatible' % file_extension[1:], out_log) 

32 raise SystemExit(classname + ': Format %s in trajectory input file is not compatible' % file_extension[1:]) 

33 return path 

34 

35 

36def check_out_path(path, out_log, classname): 

37 """ Checks if output folder exists """ 

38 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

39 fu.log(classname + ': Unexisting output folder, exiting', out_log) 

40 raise SystemExit(classname + ': Unexisting output folder') 

41 return path 

42 

43 

44def get_parameters(properties, type, classname, out_log): 

45 """ Gets in_parameters and out_parameters """ 

46 if not properties.get(type, dict()): 

47 fu.log('No %s parameters provided' % type, out_log) 

48 return get_default_value(classname)[type] 

49 

50 return {k: v for k, v in properties.get(type, dict()).items()} 

51 

52 

53def get_binary_path(properties, type): 

54 """ Gets binary path """ 

55 return properties.get(type, get_default_value(type)) 

56 

57 

58def check_in_path(path, out_log, classname): 

59 """ Checks input instructions file """ 

60 if not Path(path).exists(): 

61 fu.log(classname + ': Unexisting input instructions file, exiting', out_log) 

62 raise SystemExit(classname + ': Unexisting input instructions file') 

63 

64 # check syntax for instructions file 

65 syntax_instructions = True 

66 err_instructions = '' 

67 if 'parm ' not in open(path).read(): 

68 syntax_instructions = False 

69 err_instructions += 'No topology provided' 

70 if 'trajin ' not in open(path).read(): 

71 syntax_instructions = False 

72 err_instructions += ' No input trajectory provided' 

73 if not syntax_instructions: 

74 fu.log(classname + ': Incorrect syntax for instructions file: %s, exiting' % err_instructions, out_log) 

75 raise SystemExit(classname + ': Incorrect syntax for instructions file: %s, exiting' % err_instructions) 

76 

77 

78def get_default_value(key): 

79 """ Gives default values according to the given key """ 

80 default_values = { 

81 "start": 1, 

82 "end": -1, 

83 "step": 1, 

84 "snapshot": 1, 

85 "format": "netcdf", 

86 "mask": "all-atoms", 

87 "reference": "first", 

88 "average": "MyAvg", 

89 "instructions_file": "instructions.in", 

90 "binary_path": "cpptraj", 

91 # default conf for Average 

92 "Average": { 

93 "in_parameters": { 

94 "start": 1, 

95 "end": -1, 

96 "step": 1, 

97 "mask": "all-atoms" 

98 }, 

99 "out_parameters": { 

100 "format": "pdb" 

101 } 

102 }, 

103 # default conf for Bfactor 

104 "Bfactor": { 

105 "in_parameters": { 

106 "start": 1, 

107 "end": -1, 

108 "step": 1, 

109 "mask": "all-atoms", 

110 "reference": "first" 

111 } 

112 }, 

113 # default conf for Convert 

114 "Convert": { 

115 "in_parameters": { 

116 "start": 1, 

117 "end": -1, 

118 "step": 1, 

119 "mask": "all-atoms" 

120 }, 

121 "out_parameters": { 

122 "format": "netcdf" 

123 } 

124 }, 

125 # default conf for Dry 

126 "Dry": { 

127 "in_parameters": { 

128 "start": 1, 

129 "end": -1, 

130 "step": 1, 

131 "mask": "all-atoms" 

132 }, 

133 "out_parameters": { 

134 "format": "netcdf" 

135 } 

136 }, 

137 # default conf for Image 

138 "Image": { 

139 "in_parameters": { 

140 "start": 1, 

141 "end": -1, 

142 "step": 1, 

143 "mask": "all-atoms" 

144 }, 

145 "out_parameters": { 

146 "format": "netcdf" 

147 } 

148 }, 

149 # default conf for Mask 

150 "Mask": { 

151 "in_parameters": { 

152 "start": 1, 

153 "end": -1, 

154 "step": 1, 

155 "mask": "all-atoms" 

156 }, 

157 "out_parameters": { 

158 "format": "netcdf" 

159 } 

160 }, 

161 # default conf for Rgyr 

162 "Rgyr": { 

163 "in_parameters": { 

164 "start": 1, 

165 "end": -1, 

166 "step": 1, 

167 "mask": "all-atoms" 

168 } 

169 }, 

170 # default conf for Rms 

171 "Rms": { 

172 "in_parameters": { 

173 "start": 1, 

174 "end": -1, 

175 "step": 1, 

176 "mask": "all-atoms", 

177 "reference": "first" 

178 } 

179 }, 

180 # default conf for Rmsf 

181 "Rmsf": { 

182 "in_parameters": { 

183 "start": 1, 

184 "end": -1, 

185 "step": 1, 

186 "mask": "all-atoms", 

187 "reference": "first" 

188 } 

189 }, 

190 # default conf for Slice 

191 "Slice": { 

192 "in_parameters": { 

193 "start": 1, 

194 "end": -1, 

195 "step": 1, 

196 "mask": "all-atoms" 

197 }, 

198 "out_parameters": { 

199 "format": "netcdf" 

200 } 

201 }, 

202 # default conf for Snapshot 

203 "Snapshot": { 

204 "in_parameters": { 

205 "snapshot": 12, 

206 "mask": "all-atoms" 

207 }, 

208 "out_parameters": { 

209 "format": "pdb" 

210 } 

211 }, 

212 # default conf for Strip 

213 "Strip": { 

214 "in_parameters": { 

215 "start": 1, 

216 "end": -1, 

217 "step": 1, 

218 "mask": "all-atoms" 

219 }, 

220 "out_parameters": { 

221 "format": "netcdf" 

222 } 

223 } 

224 } 

225 

226 return default_values[key] 

227 

228 

229def is_valid_topology(ext): 

230 """ Checks if trajectory format is compatible with Cpptraj """ 

231 formats = 'top', 'pdb', 'prmtop', 'parmtop', 'zip' 

232 return ext in formats 

233 

234 

235def is_valid_trajectory(traj): 

236 """ Checks if trajectory format is compatible with Cpptraj """ 

237 formats = 'mdcrd', 'crd', 'cdf', 'netcdf', 'nc', 'restart', 'ncrestart', 'restartnc', 'dcd', 'charmm', 'cor', 'pdb', 'mol2', 'trr', 'gro', 'binpos', 'xtc', 'cif', 'arc', 'sqm', 'sdf', 'conflib' 

238 return traj in formats 

239 

240 

241def is_valid_reference(ref): 

242 """ Checks if reference is correct """ 

243 references = 'first', 'average', 'experimental' 

244 return ref in references 

245 

246 

247def get_mask_atoms(key): 

248 """ Gives mask atoms according to the given key """ 

249 masks = { 

250 "c-alpha": "@CA", 

251 "backbone": "@C,CA,N,O,C3',O3',C4',C5',O5',P", 

252 "all-atoms": ":*", 

253 "heavy-atoms": "!@H*,1H*,2H*,3H*", 

254 "side-chain": "!@CA,C,N,O,H,HA,C3',O3',C4',C5',O5',P", 

255 "solute": "!:WAT,HOH,SOL,TIP3,TP3,SOD,CLA,Na+,Cl-,NA,CL,K+,K", 

256 "ions": ":SOD,CLA,Na+,Cl-,NA,CL,K+,K", 

257 "solvent": ":WAT,HOH,SOL,TIP3,TP3" 

258 } 

259 

260 # if key incorrect, return default value and message 

261 if key in masks: 

262 return masks[key], None 

263 else: 

264 return key, None # Allow for Amber mask 

265 

266 

267def get_in_parameters(list, out_log, type='None'): 

268 """ Return string with input parameters """ 

269 # if strip or mask, no mandatory trajin parameters 

270 if type == 'strip' or type == 'mask': 

271 start = '' if 'start' not in list else str(list['start']) 

272 end = '' if 'end' not in list else str(list['end']) 

273 step = '' if 'step' not in list else str(list['step']) 

274 if (not start or start == 'None') and (not end or end == 'None') and (not step or step == 'None'): 

275 return '' 

276 else: 

277 if not start: 

278 start = str(get_default_value("start")) 

279 fu.log('No start value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('start'), out_log) 

280 if not end: 

281 end = str(get_default_value("end")) 

282 fu.log('No end value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('end'), out_log) 

283 if not step: 

284 step = str(get_default_value("step")) 

285 fu.log('No step value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('step'), out_log) 

286 else: 

287 # check if trajin parameters are provided and have correct format 

288 if (type == 'snapshot'): 

289 snapshot = str(get_default_value("snapshot")) if 'snapshot' not in list else str(list['snapshot']) 

290 if 'snapshot' not in list or not list['snapshot'] or not isinstance(list['snapshot'], int): 

291 snapshot = str(get_default_value("snapshot")) 

292 fu.log('No snapshot value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('snapshot'), out_log) 

293 start = snapshot 

294 end = snapshot 

295 step = '1' 

296 else: 

297 start = str(get_default_value("start")) if 'start' not in list else str(list['start']) 

298 if 'start' not in list or not list['start'] or not isinstance(list['start'], int): 

299 start = str(get_default_value("start")) 

300 fu.log('No start value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('start'), out_log) 

301 end = str(get_default_value("end")) if 'end' not in list else str(list['end']) 

302 if 'end' not in list or not list['end'] or not isinstance(list['end'], int): 

303 end = str(get_default_value("end")) 

304 fu.log('No end value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('end'), out_log) 

305 step = str(get_default_value("step")) if 'step' not in list else str(list['step']) 

306 if 'step' not in list or not list['step'] or not isinstance(list['step'], int): 

307 step = str(get_default_value("step")) 

308 fu.log('No step value provided in configuration file or incorrect format, assigned default value: %s' % get_default_value('step'), out_log) 

309 

310 # checking start <= end 

311 if end != '-1' and start > end: 

312 fu.log('End must be -1 (indicating the end of the trajectory) or more or equal than start. Your values are start: %s, end: %s' % (start, end), out_log) 

313 raise SystemExit('End must be -1 (indicating the end of the trajectory) or greater or equal than start. Your values are start: %s, end: %s' % (start, end)) 

314 

315 return start + " " + end + " " + step 

316 

317 

318def setup_structure(out_log): 

319 """ Sets up the structure """ 

320 instructions_list = [] 

321 mask_atoms = get_mask('heavy-atoms', out_log) 

322 instructions_list.append('center ' + mask_atoms + ' origin') 

323 instructions_list.append('autoimage') 

324 instructions_list.append('rms first ' + mask_atoms) 

325 mask_solvent = get_mask('solvent', out_log) 

326 mask_ions = get_mask('ions', out_log) 

327 instructions_list.append('strip ' + mask_solvent + ',' + mask_ions[1:]) 

328 

329 return instructions_list 

330 

331 

332def get_negative_mask(key, out_log): 

333 """ Gives the negative mask according to the given key """ 

334 atoms, msg = get_mask_atoms(key) 

335 if atoms[0] == '!': 

336 mask = atoms[1:] 

337 else: 

338 mask = '!' + atoms 

339 # if mask incorrect, give message 

340 if msg: 

341 fu.log(msg, out_log) 

342 

343 return mask 

344 

345 

346def get_mask(key, out_log): 

347 """ Gives mask according to the given key """ 

348 mask, msg = get_mask_atoms(key) 

349 # if mask incorrect, give message 

350 if msg: 

351 fu.log(msg, out_log) 

352 

353 return mask 

354 

355 

356def get_reference(ref, output_cpptraj_path, input_exp_path, mask, output, classname, out_log): 

357 """ Gives reference instructions according to the given key """ 

358 instructions_list = [] 

359 if not ref or ref == 'None': 

360 ref = get_default_value('reference') 

361 fu.log('No reference provided in configuration file, assigned default value: %s' % get_default_value('reference'), out_log) 

362 

363 if not is_valid_reference(ref): 

364 fu.log('Reference %s is not compatible, assigned default value: %s' % (ref, get_default_value('reference')), out_log) 

365 ref = get_default_value('reference') 

366 

367 if ref == 'first': 

368 if output: 

369 instructions_list.append('rms first out ' + output_cpptraj_path) 

370 else: 

371 instructions_list.append('rms first') 

372 

373 if ref == 'average': 

374 instructions_list.append('average crdset ' + get_default_value('average')) 

375 instructions_list.append('run') 

376 if output: 

377 instructions_list.append('rms ref ' + get_default_value('average') + ' ' + mask + ' out ' + output_cpptraj_path) 

378 else: 

379 instructions_list.append('rms ref ' + get_default_value('average') + ' ' + mask) 

380 

381 if ref == 'experimental': 

382 if not input_exp_path: 

383 fu.log('No experimental structure provided, exiting', out_log) 

384 raise SystemExit(classname + ': input_exp_path is mandatory') 

385 instructions_list.append('parm ' + input_exp_path + ' noconect [exp]') 

386 solute, msg = get_mask_atoms('solute') 

387 instructions_list.append('reference ' + input_exp_path + ' ' + solute + ' parm [exp]') 

388 backbone, msg = get_mask_atoms('backbone') 

389 if output: 

390 instructions_list.append('rms reference ' + mask + ' out ' + output_cpptraj_path) 

391 else: 

392 instructions_list.append('rms reference ' + mask) 

393 

394 return instructions_list 

395 

396 

397def get_reference_rms(ref, output_cpptraj_path, input_exp_path, mask, output, classname, out_log, nofit=False, norotate=False, nomod=False): 

398 """ Gives reference instructions according to the given key """ 

399 instructions_list = [] 

400 if not ref or ref == 'None': 

401 ref = get_default_value('reference') 

402 fu.log('No reference provided in configuration file, assigned default value: %s' % get_default_value('reference'), out_log) 

403 

404 if not is_valid_reference(ref): 

405 fu.log('Reference %s is not compatible, assigned default value: %s' % (ref, get_default_value('reference')), out_log) 

406 ref = get_default_value('reference') 

407 

408 flags = [] 

409 if nofit: 

410 flags.append("nofit") 

411 if norotate: 

412 flags.append("norotate") 

413 if nomod: 

414 flags.append("nomod") 

415 

416 flags_str = " ".join(flags) 

417 

418 if ref == 'first': 

419 if output: 

420 instructions_list.append('rms first out ' + output_cpptraj_path + f' {flags_str}') 

421 else: 

422 instructions_list.append('rms first' + f' {flags_str}') 

423 

424 if ref == 'average': 

425 instructions_list.append('average crdset ' + get_default_value('average')) 

426 instructions_list.append('run') 

427 if output: 

428 instructions_list.append('rms ref ' + get_default_value('average') + ' ' + mask + ' out ' + output_cpptraj_path + f' {flags_str}') 

429 else: 

430 instructions_list.append('rms ref ' + get_default_value('average') + ' ' + mask + f' {flags_str}') 

431 

432 if ref == 'experimental': 

433 if not input_exp_path: 

434 fu.log('No experimental structure provided, exiting', out_log) 

435 raise SystemExit(classname + ': input_exp_path is mandatory') 

436 instructions_list.append('parm ' + input_exp_path + ' noconect [exp]') 

437 solute, msg = get_mask_atoms('solute') 

438 instructions_list.append('reference ' + input_exp_path + ' ' + solute + ' parm [exp]') 

439 backbone, msg = get_mask_atoms('backbone') 

440 if output: 

441 instructions_list.append('rms reference ' + mask + ' out ' + output_cpptraj_path + f' {flags_str}') 

442 else: 

443 instructions_list.append('rms reference ' + mask + f' {flags_str}') 

444 

445 return instructions_list 

446 

447 

448def get_out_parameters(list, out_log): 

449 """ Return string with output parameters """ 

450 format = list['format'] 

451 # msg = None 

452 # check if format provided 

453 if not format: 

454 format = get_default_value('format') 

455 fu.log('No format provided in configuration file, assigned default value: %s' % get_default_value('format'), out_log) 

456 # check if valid format 

457 if not is_valid_trajectory(format): 

458 fu.log('Format %s is not compatible, assigned default value: %s' % (format, get_default_value('format')), out_log) 

459 format = get_default_value('format') 

460 

461 return format 

462 

463 

464def copy_instructions_file_to_container(instructions_file, unique_dir): 

465 shutil.copy2(instructions_file, unique_dir) 

466 

467 

468def remove_tmp_files(list, remove_tmp, out_log, input_top_path_orig=None, input_top_path=None): 

469 """ Removes temporal files generated by the wrapper """ 

470 tmp_files = list 

471 if zipfile.is_zipfile(input_top_path_orig): 

472 tmp_files.append(PurePath(input_top_path).parent) 

473 

474 if remove_tmp: 

475 removed_files = [f for f in tmp_files if fu.rm(f)] 

476 fu.log('Removed: %s' % str(removed_files), out_log)