Coverage for biobb_io/api/common.py: 85%

181 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-07 10:41 +0000

1""" Common functions for package api """ 

2import os 

3import json 

4import requests 

5import re 

6import urllib.request 

7from pathlib import Path, PurePath 

8from biobb_common.tools import file_utils as fu 

9 

10 

11def check_output_path(path, argument, optional, out_log, classname): 

12 """ Checks output file """ 

13 if optional and not path: 

14 return None 

15 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

16 fu.log(classname + ': Unexisting %s folder, exiting' % argument, out_log) 

17 raise SystemExit(classname + ': Unexisting %s folder' % argument) 

18 file_extension = PurePath(path).suffix 

19 if not is_valid_file(file_extension[1:], argument): 

20 fu.log(classname + ': Format %s in %s file is not compatible' % (file_extension[1:], argument), out_log) 

21 raise SystemExit(classname + ': Format %s in %s file is not compatible' % (file_extension[1:], argument)) 

22 return path 

23 

24 

25def is_valid_file(ext, argument): 

26 """ Checks if file format is compatible """ 

27 formats = { 

28 'output_sdf_path': ['sdf'], 

29 'output_pdb_path': ['pdb'], 

30 'output_simulations': ['json'], 

31 'output_simulation': ['zip'], 

32 'output_pdb_zip_path': ['zip'], 

33 'output_mutations_list_txt': ['txt'], 

34 'output_json_path': ['json'], 

35 'output_fasta_path': ['fasta'], 

36 'output_mmcif_path': ['mmcif', 'cif'], 

37 } 

38 return ext in formats[argument] 

39 

40 

41def download_pdb(pdb_code, api_id, out_log=None, global_log=None): 

42 """ 

43 Returns: 

44 String: Content of the pdb file. 

45 """ 

46 

47 if api_id == 'mmb': 

48 url = "https://mmb.irbbarcelona.org/api/pdb/" + pdb_code + "/coords/?" 

49 elif api_id == 'pdb': 

50 url = "https://files.rcsb.org/download/" + pdb_code + ".pdb" 

51 elif api_id == 'pdbe': 

52 url = "https://www.ebi.ac.uk/pdbe/entry-files/download/pdb" + pdb_code + ".ent" 

53 

54 fu.log("Downloading %s from: %s" % (pdb_code, url), out_log, global_log) 

55 return requests.get(url).content.decode('utf-8') 

56 

57 

58def download_af(uniprot_code, out_log=None, global_log=None, classname=None): 

59 """ 

60 Returns: 

61 String: Content of the pdb file. 

62 """ 

63 

64 url = "https://alphafold.ebi.ac.uk/files/AF-" + uniprot_code + "-F1-model_v3.pdb" 

65 

66 fu.log("Downloading %s from: %s" % (uniprot_code, url), out_log, global_log) 

67 

68 r = requests.get(url) 

69 if (r.status_code == 404): 

70 fu.log(classname + ': Incorrect Uniprot Code: %s' % (uniprot_code), out_log) 

71 raise SystemExit(classname + ': Incorrect Uniprot Code: %s' % (uniprot_code)) 

72 

73 return r.content.decode('utf-8') 

74 

75 

76def download_mmcif(pdb_code, api_id, out_log=None, global_log=None): 

77 """ 

78 Returns: 

79 String: Content of the mmcif file. 

80 """ 

81 

82 if api_id == 'mmb': 

83 url = "http://mmb.irbbarcelona.org/api/pdb/" + pdb_code + ".cif" 

84 elif api_id == 'pdb': 

85 url = "https://files.rcsb.org/download/" + pdb_code + ".cif" 

86 elif api_id == 'pdbe': 

87 url = "https://www.ebi.ac.uk/pdbe/entry-files/download/" + pdb_code + ".cif" 

88 

89 fu.log("Downloading %s from: %s" % (pdb_code, url), out_log, global_log) 

90 return requests.get(url, verify=False).content.decode('utf-8') 

91 

92 

93def download_ligand(ligand_code, api_id, out_log=None, global_log=None): 

94 """ 

95 Returns: 

96 String: Content of the ligand file. 

97 """ 

98 

99 if api_id == 'mmb': 

100 url = "http://mmb.irbbarcelona.org/api/pdbMonomer/" + ligand_code.lower() 

101 text = requests.get(url, verify=False).content.decode('utf-8') 

102 elif api_id == 'pdbe': 

103 url = "ftp://ftp.ebi.ac.uk/pub/databases/msd/pdbechem_v2/{0}/{1}/{1}_ideal.pdb".format(ligand_code.upper()[0], ligand_code.upper()) 

104 text = urllib.request.urlopen(url).read().decode('utf-8') 

105 

106 fu.log("Downloading %s from: %s" % (ligand_code, url), out_log, global_log) 

107 

108 # removing useless empty lines at the end of the file 

109 text = os.linesep.join([s for s in text.splitlines() if s]) 

110 

111 return text 

112 

113 

114def download_fasta(pdb_code, api_id, out_log=None, global_log=None): 

115 """ 

116 Returns: 

117 String: Content of the fasta file. 

118 """ 

119 

120 if api_id == 'mmb': 

121 url = "http://mmb.irbbarcelona.org/api/pdb/" + pdb_code + ".fasta" 

122 elif api_id == 'pdb': 

123 url = "https://www.rcsb.org/fasta/entry/" + pdb_code 

124 elif api_id == 'pdbe': 

125 url = "https://www.ebi.ac.uk/pdbe/entry/pdb/" + pdb_code + "/fasta" 

126 

127 fu.log("Downloading %s from: %s" % (pdb_code, url), out_log, global_log) 

128 return requests.get(url, verify=False).content.decode('utf-8') 

129 

130 

131def download_drugbank(drugbank_id, url="https://www.drugbank.ca/structures/small_molecule_drugs/%s.sdf?type=3d", out_log=None, global_log=None): 

132 """ 

133 Returns: 

134 String: Content of the component file. 

135 """ 

136 url = (url % drugbank_id) 

137 

138 fu.log("Downloading %s from: %s" % (drugbank_id, url), out_log, global_log) 

139 

140 text = requests.get(url, verify=False).content.decode('utf-8') 

141 

142 return text 

143 

144 

145def download_binding_site(pdb_code, url="https://www.ebi.ac.uk/pdbe/api/pdb/entry/binding_sites/%s", out_log=None, global_log=None): 

146 """ 

147 Returns: 

148 String: Content of the component file. 

149 """ 

150 url = (url % pdb_code) 

151 

152 fu.log("Getting binding sites from: %s" % (url), out_log, global_log) 

153 

154 text = urllib.request.urlopen(url).read() 

155 json_obj = json.loads(text) 

156 json_string = json.dumps(json_obj, indent=4, sort_keys=True) 

157 # json_string = json.dumps(text, indent=4) 

158 

159 return json_string 

160 

161 

162def download_ideal_sdf(ligand_code, api_id, out_log=None, global_log=None): 

163 """ 

164 Returns: 

165 String: Content of the ideal sdf file. 

166 """ 

167 

168 if api_id == 'pdb': 

169 url = "https://files.rcsb.org/ligands/view/" + ligand_code.upper() + "_ideal.sdf" 

170 text = requests.get(url, verify=False).content.decode('utf-8') 

171 elif api_id == 'pdbe': 

172 url = "ftp://ftp.ebi.ac.uk/pub/databases/msd/pdbechem_v2/{0}/{1}/{1}_ideal.sdf".format(ligand_code.upper()[0], ligand_code.upper()) 

173 text = urllib.request.urlopen(url).read().decode('utf-8') 

174 

175 fu.log("Downloading %s from: %s" % (ligand_code, url), out_log, global_log) 

176 

177 return text 

178 

179 

180def download_str_info(pdb_code, url="http://mmb.irbbarcelona.org/api/pdb/%s.json", out_log=None, global_log=None): 

181 """ 

182 Returns: 

183 String: Content of the JSON file. 

184 """ 

185 url = (url % pdb_code) 

186 

187 fu.log("Getting structure info from: %s" % (url), out_log, global_log) 

188 

189 text = urllib.request.urlopen(url).read() 

190 json_obj = json.loads(text) 

191 json_string = json.dumps(json_obj, indent=4, sort_keys=True) 

192 # json_string = json.dumps(text, indent=4) 

193 

194 return json_string 

195 

196 

197def write_pdb(pdb_string, output_pdb_path, filt=None, out_log=None, global_log=None): 

198 """ Writes and filters a PDB """ 

199 fu.log("Writting pdb to: %s" % (output_pdb_path), out_log, global_log) 

200 with open(output_pdb_path, 'w') as output_pdb_file: 

201 if filt: 

202 fu.log("Filtering lines NOT starting with one of these words: %s" % str(filt), out_log, global_log) 

203 for line in pdb_string.splitlines(True): 

204 if line.strip().split()[0][0:6] in filt: 

205 output_pdb_file.write(line) 

206 else: 

207 output_pdb_file.write(pdb_string) 

208 

209 

210def write_mmcif(mmcif_string, output_mmcif_path, out_log=None, global_log=None): 

211 """ Writes a mmcif """ 

212 fu.log("Writting mmcif to: %s" % (output_mmcif_path), out_log, global_log) 

213 with open(output_mmcif_path, 'w') as output_mmcif_file: 

214 output_mmcif_file.write(mmcif_string) 

215 

216 

217def write_fasta(fasta_string, output_fasta_path, out_log=None, global_log=None): 

218 """ Writes a FASTA """ 

219 fu.log("Writting FASTA to: %s" % (output_fasta_path), out_log, global_log) 

220 with open(output_fasta_path, 'w') as output_fasta_file: 

221 output_fasta_file.write(fasta_string) 

222 

223 

224def write_sdf(sdf_string, output_sdf_path, out_log=None, global_log=None): 

225 """ Writes a SDF """ 

226 fu.log("Writting sdf to: %s" % (output_sdf_path), out_log, global_log) 

227 with open(output_sdf_path, 'w') as output_sdf_file: 

228 output_sdf_file.write(sdf_string) 

229 

230 

231def get_cluster_pdb_codes(pdb_code, cluster, out_log=None, global_log=None): 

232 """ 

233 Returns: 

234 String list: The list of pdb_codes of the selected cluster. 

235 """ 

236 url = "http://mmb.irbbarcelona.org/api/pdb/" 

237 pdb_codes = set() 

238 

239 url = url+pdb_code.lower()+'/clusters/cl-'+str(cluster)+".json" 

240 cluster_list = json.loads(requests.get(url, verify=False).content.decode('utf-8'))['clusterMembers'] 

241 for elem in cluster_list: 

242 pdb_codes.add(elem['_id'].lower()) 

243 

244 if out_log: 

245 out_log.info('Cluster: '+str(cluster)+' of pdb_code: '+pdb_code+'\n List: '+str(pdb_codes)) 

246 if global_log: 

247 global_log.info(fu.get_logs_prefix()+'Cluster: '+str(cluster)+' of pdb_code: '+pdb_code+'\n List: '+str(pdb_codes)) 

248 

249 return pdb_codes 

250 

251 

252def get_uniprot(pdb_code, url, out_log=None, global_log=None): 

253 """Returns the UNIPROT code corresponding to the `pdb_code`. 

254 

255 Returns: 

256 str: UNIPROT code. 

257 """ 

258 url_uniprot_id = (url+"/pdb/"+pdb_code.lower()+"/entry/uniprotRefs/_id") 

259 uniprot_id = requests.get(url_uniprot_id, verify=False).json()['uniprotRefs._id'][0] 

260 

261 if out_log: 

262 out_log.info('PDB code: '+pdb_code+' correspond to uniprot id: '+uniprot_id) 

263 if global_log: 

264 global_log.info('PDB code: '+pdb_code+' correspond to uniprot id: '+uniprot_id) 

265 

266 return uniprot_id 

267 

268 

269def get_variants(uniprot_id, url="http://mmb.irbbarcelona.org/api", out_log=None, global_log=None): 

270 """Returns the variants of the `uniprot_id` code. 

271 

272 Returns: 

273 :obj:`list` of :obj:`str`: List of variants. 

274 """ 

275 url_uniprot_mut = (url+"/uniprot/"+uniprot_id+"/entry/variants/vardata/mut/?varorig=humsavar") 

276 variants = requests.get(url_uniprot_mut, verify=False).json()['variants.vardata.mut'] 

277 variants = variants if variants else [] 

278 

279 fu.log('Found: %d variants for uniprot id: %s' % (len(variants), uniprot_id), out_log, global_log) 

280 return variants if variants else [] 

281 

282 

283def write_json(json_string, output_json_path, out_log=None, global_log=None): 

284 """ Writes a JSON """ 

285 fu.log("Writting json to: %s" % (output_json_path), out_log, global_log) 

286 with open(output_json_path, 'w') as output_json_file: 

287 output_json_file.write(json_string) 

288 

289 

290def get_memprotmd_sim_list(out_log=None, global_log=None): 

291 """ Returns all available membrane-protein systems (simulations) from the MemProtMD DB using its REST API """ 

292 

293 fu.log('Getting all available membrane-protein systems (simulations) from the MemProtMD REST API', out_log, global_log) 

294 

295 url = "http://memprotmd.bioch.ox.ac.uk/api/simulations/all" 

296 json_obj = requests.post(url).json() 

297 json_string = json.dumps(json_obj, indent=4) 

298 

299 fu.log('Total number of simulations: %d' % (len(json_obj)), out_log, global_log) 

300 

301 return json_string 

302 

303 

304def get_memprotmd_sim_search(collection_name, keyword, out_log=None, global_log=None): 

305 """ Performs advanced searches in the MemProtMD DB using its REST API and a given keyword """ 

306 

307 fu.log('Getting search results from the MemProtMD REST API. Collection name: %s, keyword: %s' % (collection_name, keyword), out_log, global_log) 

308 

309 url = "http://memprotmd.bioch.ox.ac.uk/api/search/advanced" 

310 json_query = { 

311 "collectionName": collection_name, 

312 "query": { 

313 "keywords": keyword 

314 }, 

315 "projection": { 

316 "simulations": 1 

317 }, 

318 "options": {} 

319 } 

320 

321 json_obj = requests.post(url, json=json_query).json() 

322 json_string = json.dumps(json_obj, indent=4) 

323 

324 # get total number of simulation 

325 list_kw = [] 

326 for sim_list in json_obj: 

327 for sim in sim_list['simulations']: 

328 list_kw.append(sim) 

329 

330 fu.log('Total number of simulations: %d' % (len(list_kw)), out_log, global_log) 

331 

332 return json_string 

333 

334 

335def get_memprotmd_sim(pdb_code, output_file, out_log=None, global_log=None): 

336 """ Gets a single simulation from MemProtMD DB """ 

337 

338 fu.log('Getting simulation file from pdb code %s' % (pdb_code), out_log, global_log) 

339 

340 url = "http://memprotmd.bioch.ox.ac.uk/data/memprotmd/simulations/" + pdb_code + "_default_dppc/files/run/at.zip" 

341 response = requests.get(url) 

342 

343 open(output_file, 'wb').write(response.content) 

344 

345 fu.log("Saving output %s file" % (output_file), out_log, global_log) 

346 

347 

348def check_mandatory_property(property, name, out_log, classname): 

349 """ Checks mandatory properties """ 

350 

351 if not property: 

352 fu.log(classname + ': Unexisting %s property, exiting' % name, out_log) 

353 raise SystemExit(classname + ': Unexisting %s property' % name) 

354 return property 

355 

356 

357def check_uniprot_code(code, out_log, classname): 

358 """ Checks uniprot code """ 

359 

360 pattern = re.compile((r"[OPQ][0-9][A-Z0-9]{3}[0-9]|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}")) 

361 

362 if not pattern.match(code): 

363 fu.log(classname + ': Incorrect uniprot code for %s' % code, out_log) 

364 raise SystemExit(classname + ': Incorrect uniprot code for %s' % code) 

365 

366 return True