Coverage for biobb_io/api/common.py: 84%

197 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-21 16:46 +0000

1"""Common functions for package api""" 

2 

3import json 

4import os 

5import re 

6import urllib.request 

7import urllib.parse 

8from pathlib import Path, PurePath 

9 

10import requests 

11from biobb_common.tools import file_utils as fu 

12 

13 

14def check_output_path(path, argument, optional, out_log, classname) -> str: 

15 """Checks output file""" 

16 if optional and not path: 

17 return "" 

18 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

19 fu.log(classname + ": Unexisting %s folder, exiting" % argument, out_log) 

20 raise SystemExit(classname + ": Unexisting %s folder" % argument) 

21 file_extension = PurePath(path).suffix 

22 if not is_valid_file(file_extension[1:], argument): 

23 fu.log( 

24 classname + ": Format %s in %s file is not compatible" 

25 % (file_extension[1:], argument), 

26 out_log, 

27 ) 

28 raise SystemExit( 

29 classname + ": Format %s in %s file is not compatible" 

30 % (file_extension[1:], argument) 

31 ) 

32 return path 

33 

34 

35def is_valid_file(ext, argument): 

36 """Checks if file format is compatible""" 

37 formats = { 

38 "output_sdf_path": ["sdf"], 

39 "output_pdb_path": ["pdb"], 

40 "output_simulations": ["json"], 

41 "output_simulation": ["zip"], 

42 "output_pdb_zip_path": ["zip"], 

43 "output_mutations_list_txt": ["txt"], 

44 "output_json_path": ["json"], 

45 "output_fasta_path": ["fasta"], 

46 "output_mmcif_path": ["mmcif", "cif"], 

47 "output_top_path": ["pdb"], 

48 "output_trj_path": ["mdcrd", "trr", "xtc"] 

49 } 

50 return ext in formats[argument] 

51 

52 

53def download_pdb(pdb_code, api_id, out_log=None, global_log=None): 

54 """ 

55 Returns: 

56 String: Content of the pdb file. 

57 """ 

58 

59 if api_id == "mmb": 

60 url = "https://mmb.irbbarcelona.org/api/pdb/" + pdb_code + "/coords/?" 

61 elif api_id == "pdb": 

62 url = "https://files.rcsb.org/download/" + pdb_code + ".pdb" 

63 elif api_id == "pdbe": 

64 url = "https://www.ebi.ac.uk/pdbe/entry-files/download/pdb" + pdb_code + ".ent" 

65 

66 fu.log("Downloading %s from: %s" % (pdb_code, url), out_log, global_log) 

67 return requests.get(url).content.decode("utf-8") 

68 

69 

70def download_af(uniprot_code, out_log=None, global_log=None, classname=None): 

71 """ 

72 Returns: 

73 String: Content of the pdb file. 

74 """ 

75 

76 url = "https://alphafold.ebi.ac.uk/files/AF-" + uniprot_code + "-F1-model_v3.pdb" 

77 

78 fu.log("Downloading %s from: %s" % (uniprot_code, url), out_log, global_log) 

79 

80 r = requests.get(url) 

81 if r.status_code == 404: 

82 fu.log(classname + ": Incorrect Uniprot Code: %s" % (uniprot_code), out_log) 

83 raise SystemExit(classname + ": Incorrect Uniprot Code: %s" % (uniprot_code)) 

84 

85 return r.content.decode("utf-8") 

86 

87 

88def download_mddb_top(project_id, node_id, selection, out_log=None, global_log=None, classname=None): 

89 """ 

90 Returns: 

91 String: Content of the pdb file. 

92 """ 

93 

94 url = "https://" + node_id + ".mddbr.eu/api/rest/v1/projects/" + project_id + "/structure?selection=" + urllib.parse.quote(str(selection)) 

95 

96 fu.log("Downloading %s topology from: %s" % (project_id, url), out_log, global_log) 

97 

98 r = requests.get(url) 

99 if r.status_code == 404: 

100 fu.log(classname + ": Incorrect url, check project_id, node_id and selection: %s" % (url), out_log) 

101 raise SystemExit(classname + ": Incorrect url, check project_id, node_id and selection: %s" % (url)) 

102 

103 return r.content.decode("utf-8") 

104 

105 

106def download_mddb_trj(project_id, node_id, trj_format, frames, selection, out_log=None, global_log=None, classname=None): 

107 """ 

108 Returns: 

109 String: Content of the trajectory file. 

110 """ 

111 

112 url = "https://" + node_id + ".mddbr.eu/api/rest/v1/projects/" + project_id + "/trajectory?format=" + trj_format + "&frames=" + frames + "&selection=" + urllib.parse.quote(str(selection)) 

113 

114 fu.log("Downloading %s trajectory from: %s" % (project_id, url), out_log, global_log) 

115 

116 r = requests.get(url) 

117 if r.status_code == 404: 

118 fu.log(classname + ": Incorrect url, check project_id, node_id, trj_format, frames and selection: %s" % (url), out_log) 

119 raise SystemExit(classname + ": Incorrect url, check project_id, node_id, trj_format, frames and selection: %s" % (url)) 

120 

121 return r.content 

122 

123 

124def download_mmcif(pdb_code, api_id, out_log=None, global_log=None): 

125 """ 

126 Returns: 

127 String: Content of the mmcif file. 

128 """ 

129 

130 if api_id == "mmb": 

131 url = "http://mmb.irbbarcelona.org/api/pdb/" + pdb_code + ".cif" 

132 elif api_id == "pdb": 

133 url = "https://files.rcsb.org/download/" + pdb_code + ".cif" 

134 elif api_id == "pdbe": 

135 url = "https://www.ebi.ac.uk/pdbe/entry-files/download/" + pdb_code + ".cif" 

136 

137 fu.log("Downloading %s from: %s" % (pdb_code, url), out_log, global_log) 

138 return requests.get(url, verify=True).content.decode("utf-8") 

139 

140 

141def download_ligand(ligand_code, api_id, out_log=None, global_log=None): 

142 """ 

143 Returns: 

144 String: Content of the ligand file. 

145 """ 

146 

147 if api_id == "mmb": 

148 url = "http://mmb.irbbarcelona.org/api/pdbMonomer/" + ligand_code.lower() 

149 text = requests.get(url, verify=True).content.decode("utf-8") 

150 elif api_id == "pdbe": 

151 url = ( 

152 "https://www.ebi.ac.uk/pdbe/static/files/pdbechem_v2/" + ligand_code.upper() + "_ideal.pdb" 

153 ) 

154 text = urllib.request.urlopen(url).read().decode("utf-8") 

155 

156 fu.log("Downloading %s from: %s" % (ligand_code, url), out_log, global_log) 

157 

158 # removing useless empty lines at the end of the file 

159 text = os.linesep.join([s for s in text.splitlines() if s]) 

160 

161 return text 

162 

163 

164def download_fasta(pdb_code, api_id, out_log=None, global_log=None): 

165 """ 

166 Returns: 

167 String: Content of the fasta file. 

168 """ 

169 

170 if api_id == "mmb": 

171 url = "http://mmb.irbbarcelona.org/api/pdb/" + pdb_code + ".fasta" 

172 elif api_id == "pdb": 

173 url = "https://www.rcsb.org/fasta/entry/" + pdb_code 

174 elif api_id == "pdbe": 

175 url = "https://www.ebi.ac.uk/pdbe/entry/pdb/" + pdb_code + "/fasta" 

176 

177 fu.log("Downloading %s from: %s" % (pdb_code, url), out_log, global_log) 

178 return requests.get(url, verify=True).content.decode("utf-8") 

179 

180 

181def download_binding_site( 

182 pdb_code, 

183 url="https://www.ebi.ac.uk/pdbe/api/pdb/entry/binding_sites/%s", 

184 out_log=None, 

185 global_log=None, 

186): 

187 """ 

188 Returns: 

189 String: Content of the component file. 

190 """ 

191 url = url % pdb_code 

192 

193 fu.log("Getting binding sites from: %s" % (url), out_log, global_log) 

194 

195 text = urllib.request.urlopen(url).read() 

196 json_obj = json.loads(text) 

197 json_string = json.dumps(json_obj, indent=4, sort_keys=True) 

198 # json_string = json.dumps(text, indent=4) 

199 

200 return json_string 

201 

202 

203def download_ideal_sdf(ligand_code, api_id, out_log=None, global_log=None): 

204 """ 

205 Returns: 

206 String: Content of the ideal sdf file. 

207 """ 

208 

209 if api_id == "pdb": 

210 url = ( 

211 "https://files.rcsb.org/ligands/download/" + ligand_code.upper() + "_ideal.sdf" 

212 ) 

213 text = requests.get(url, verify=True).content.decode("utf-8") 

214 elif api_id == "pdbe": 

215 url = ( 

216 "https://www.ebi.ac.uk/pdbe/static/files/pdbechem_v2/" + ligand_code.upper() + "_ideal.sdf" 

217 ) 

218 text = urllib.request.urlopen(url).read().decode("utf-8") 

219 

220 fu.log("Downloading %s from: %s" % (ligand_code, url), out_log, global_log) 

221 

222 return text 

223 

224 

225def download_str_info( 

226 pdb_code, 

227 url="http://mmb.irbbarcelona.org/api/pdb/%s.json", 

228 out_log=None, 

229 global_log=None, 

230): 

231 """ 

232 Returns: 

233 String: Content of the JSON file. 

234 """ 

235 url = url % pdb_code 

236 

237 fu.log("Getting structure info from: %s" % (url), out_log, global_log) 

238 

239 text = urllib.request.urlopen(url).read() 

240 json_obj = json.loads(text) 

241 json_string = json.dumps(json_obj, indent=4, sort_keys=True) 

242 # json_string = json.dumps(text, indent=4) 

243 

244 return json_string 

245 

246 

247def write_pdb(pdb_string, output_pdb_path, filt=None, out_log=None, global_log=None): 

248 """Writes and filters a PDB""" 

249 fu.log("Writting pdb to: %s" % (output_pdb_path), out_log, global_log) 

250 with open(output_pdb_path, "w") as output_pdb_file: 

251 if filt: 

252 fu.log( 

253 "Filtering lines NOT starting with one of these words: %s" % str(filt), 

254 out_log, 

255 global_log, 

256 ) 

257 for line in pdb_string.splitlines(True): 

258 if line.strip().split()[0][0:6] in filt: 

259 output_pdb_file.write(line) 

260 else: 

261 output_pdb_file.write(pdb_string) 

262 

263 

264def write_bin(bin_string, output_bin_path, out_log=None, global_log=None): 

265 """Writes a BIN""" 

266 fu.log("Writting bin to: %s" % (output_bin_path), out_log, global_log) 

267 with open(output_bin_path, "wb") as output_bin_file: 

268 output_bin_file.write(bin_string) 

269 

270 

271def write_mmcif(mmcif_string, output_mmcif_path, out_log=None, global_log=None): 

272 """Writes a mmcif""" 

273 fu.log("Writting mmcif to: %s" % (output_mmcif_path), out_log, global_log) 

274 with open(output_mmcif_path, "w") as output_mmcif_file: 

275 output_mmcif_file.write(mmcif_string) 

276 

277 

278def write_fasta(fasta_string, output_fasta_path, out_log=None, global_log=None): 

279 """Writes a FASTA""" 

280 fu.log("Writting FASTA to: %s" % (output_fasta_path), out_log, global_log) 

281 with open(output_fasta_path, "w") as output_fasta_file: 

282 output_fasta_file.write(fasta_string) 

283 

284 

285def write_sdf(sdf_string, output_sdf_path, out_log=None, global_log=None): 

286 """Writes a SDF""" 

287 fu.log("Writting sdf to: %s" % (output_sdf_path), out_log, global_log) 

288 with open(output_sdf_path, "w") as output_sdf_file: 

289 output_sdf_file.write(sdf_string) 

290 

291 

292def get_cluster_pdb_codes(pdb_code, cluster, out_log=None, global_log=None): 

293 """ 

294 Returns: 

295 String list: The list of pdb_codes of the selected cluster. 

296 """ 

297 url = "http://mmb.irbbarcelona.org/api/pdb/" 

298 pdb_codes = set() 

299 

300 url = url + pdb_code.lower() + "/clusters/cl-" + str(cluster) + ".json" 

301 cluster_list = json.loads(requests.get(url, verify=True).content.decode("utf-8"))[ 

302 "clusterMembers" 

303 ] 

304 for elem in cluster_list: 

305 pdb_codes.add(elem["_id"].lower()) 

306 

307 if out_log: 

308 out_log.info( 

309 "Cluster: " + str(cluster) + " of pdb_code: " + pdb_code + "\n List: " + str(pdb_codes) 

310 ) 

311 if global_log: 

312 global_log.info( 

313 fu.get_logs_prefix() + "Cluster: " + str(cluster) + " of pdb_code: " + pdb_code + "\n List: " + str(pdb_codes) 

314 ) 

315 

316 return pdb_codes 

317 

318 

319def get_uniprot(pdb_code, url, out_log=None, global_log=None): 

320 """Returns the UNIPROT code corresponding to the `pdb_code`. 

321 

322 Returns: 

323 str: UNIPROT code. 

324 """ 

325 url_uniprot_id = url + "/pdb/" + pdb_code.lower() + "/entry/uniprotRefs/_id" 

326 uniprot_id = requests.get(url_uniprot_id, verify=True).json()["uniprotRefs._id"][0] 

327 

328 if out_log: 

329 out_log.info( 

330 "PDB code: " + pdb_code + " correspond to uniprot id: " + uniprot_id 

331 ) 

332 if global_log: 

333 global_log.info( 

334 "PDB code: " + pdb_code + " correspond to uniprot id: " + uniprot_id 

335 ) 

336 

337 return uniprot_id 

338 

339 

340def get_variants( 

341 uniprot_id, url="http://mmb.irbbarcelona.org/api", out_log=None, global_log=None 

342): 

343 """Returns the variants of the `uniprot_id` code. 

344 

345 Returns: 

346 :obj:`list` of :obj:`str`: List of variants. 

347 """ 

348 url_uniprot_mut = ( 

349 url + "/uniprot/" + uniprot_id + "/entry/variants/vardata/mut/?varorig=humsavar" 

350 ) 

351 variants = requests.get(url_uniprot_mut, verify=True).json()["variants.vardata.mut"] 

352 variants = variants if variants else [] 

353 

354 fu.log( 

355 "Found: %d variants for uniprot id: %s" % (len(variants), uniprot_id), 

356 out_log, 

357 global_log, 

358 ) 

359 return variants if variants else [] 

360 

361 

362def write_json(json_string, output_json_path, out_log=None, global_log=None): 

363 """Writes a JSON""" 

364 fu.log("Writting json to: %s" % (output_json_path), out_log, global_log) 

365 with open(output_json_path, "w") as output_json_file: 

366 output_json_file.write(json_string) 

367 

368 

369def get_memprotmd_sim_list(out_log=None, global_log=None): 

370 """Returns all available membrane-protein systems (simulations) from the MemProtMD DB using its REST API""" 

371 

372 fu.log( 

373 "Getting all available membrane-protein systems (simulations) from the MemProtMD REST API", 

374 out_log, 

375 global_log, 

376 ) 

377 

378 url = "http://memprotmd.bioch.ox.ac.uk/api/simulations/all" 

379 json_obj = requests.post(url).json() 

380 json_string = json.dumps(json_obj, indent=4) 

381 

382 fu.log("Total number of simulations: %d" % (len(json_obj)), out_log, global_log) 

383 

384 return json_string 

385 

386 

387def get_memprotmd_sim_search(collection_name, keyword, out_log=None, global_log=None): 

388 """Performs advanced searches in the MemProtMD DB using its REST API and a given keyword""" 

389 

390 fu.log( 

391 "Getting search results from the MemProtMD REST API. Collection name: %s, keyword: %s" 

392 % (collection_name, keyword), 

393 out_log, 

394 global_log, 

395 ) 

396 

397 url = "http://memprotmd.bioch.ox.ac.uk/api/search/advanced" 

398 json_query = { 

399 "collectionName": collection_name, 

400 "query": {"keywords": keyword}, 

401 "projection": {"simulations": 1}, 

402 "options": {}, 

403 } 

404 

405 json_obj = requests.post(url, json=json_query).json() 

406 json_string = json.dumps(json_obj, indent=4) 

407 

408 # get total number of simulation 

409 list_kw = [] 

410 for sim_list in json_obj: 

411 for sim in sim_list["simulations"]: 

412 list_kw.append(sim) 

413 

414 fu.log("Total number of simulations: %d" % (len(list_kw)), out_log, global_log) 

415 

416 return json_string 

417 

418 

419def get_memprotmd_sim(pdb_code, output_file, out_log=None, global_log=None): 

420 """Gets a single simulation from MemProtMD DB""" 

421 

422 fu.log("Getting simulation file from pdb code %s" % (pdb_code), out_log, global_log) 

423 

424 url = ( 

425 "http://memprotmd.bioch.ox.ac.uk/data/memprotmd/simulations/" + pdb_code + "_default_dppc/files/run/at.zip" 

426 ) 

427 response = requests.get(url) 

428 

429 open(output_file, "wb").write(response.content) 

430 

431 fu.log("Saving output %s file" % (output_file), out_log, global_log) 

432 

433 

434def check_mandatory_property(property, name, out_log, classname): 

435 """Checks mandatory properties""" 

436 

437 if not property: 

438 fu.log(classname + ": Unexisting %s property, exiting" % name, out_log) 

439 raise SystemExit(classname + ": Unexisting %s property" % name) 

440 return property 

441 

442 

443def check_uniprot_code(code, out_log, classname): 

444 """Checks uniprot code""" 

445 

446 pattern = re.compile( 

447 (r"[OPQ][0-9][A-Z0-9]{3}[0-9]|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}") 

448 ) 

449 

450 if not pattern.match(code): 

451 fu.log(classname + ": Incorrect uniprot code for %s" % code, out_log) 

452 raise SystemExit(classname + ": Incorrect uniprot code for %s" % code) 

453 

454 return True