Coverage for biobb_dna/interbp_correlations/interbpcorr.py: 84%

144 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 10:36 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the InterBasePairCorrelation class and the command line interface.""" 

4 

5import argparse 

6from itertools import product 

7from typing import Optional 

8 

9import matplotlib as mpl 

10import matplotlib.pyplot as plt 

11import numpy as np 

12import pandas as pd 

13from biobb_common.configuration import settings 

14from biobb_common.generic.biobb_object import BiobbObject 

15from biobb_common.tools.file_utils import launchlogger 

16 

17from biobb_dna.utils import constants 

18from biobb_dna.utils.common import _from_string_to_list 

19from biobb_dna.utils.loader import read_series 

20 

21 

22class InterBasePairCorrelation(BiobbObject): 

23 """ 

24 | biobb_dna InterBasePairCorrelation 

25 | Calculate correlation between all base pairs of a single sequence and for a single helical parameter. 

26 | Calculate correlation between neighboring base pairs and pairs of helical parameters. 

27 

28 Args: 

29 input_filename_shift (str): Path to .ser file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330). 

30 input_filename_slide (str): Path to .ser file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_slide.ser>`_. Accepted formats: ser (edam:format_2330). 

31 input_filename_rise (str): Path to .ser file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_rise.ser>`_. Accepted formats: ser (edam:format_2330). 

32 input_filename_tilt (str): Path to .ser file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_tilt.ser>`_. Accepted formats: ser (edam:format_2330). 

33 input_filename_roll (str): Path to .ser file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_roll.ser>`_. Accepted formats: ser (edam:format_2330). 

34 input_filename_twist (str): Path to .ser file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_twist.ser>`_. Accepted formats: ser (edam:format_2330). 

35 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752). 

36 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579). 

37 properties (dict): 

38 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

39 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

40 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

41 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

42 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

43 

44 Examples: 

45 This is a use example of how to use the building block from Python:: 

46 

47 from biobb_dna.interbp_correlations.interbpcorr import interbpcorr 

48 

49 interbpcorr( 

50 input_filename_shift='path/to/input/shift.ser', 

51 input_filename_slide='path/to/input/slide.ser', 

52 input_filename_rise='path/to/input/slide.ser', 

53 input_filename_tilt='path/to/input/tilt.ser', 

54 input_filename_roll='path/to/input/roll.ser', 

55 input_filename_twist='path/to/input/twist.ser', 

56 output_csv_path='path/to/output/file.csv', 

57 output_jpg_path='path/to/output/plot.jpg', 

58 properties=prop) 

59 Info: 

60 * wrapped_software: 

61 * name: In house 

62 * license: Apache-2.0 

63 * ontology: 

64 * name: EDAM 

65 * schema: http://edamontology.org/EDAM.owl 

66 

67 """ 

68 

69 def __init__( 

70 self, 

71 input_filename_shift, 

72 input_filename_slide, 

73 input_filename_rise, 

74 input_filename_tilt, 

75 input_filename_roll, 

76 input_filename_twist, 

77 output_csv_path, 

78 output_jpg_path, 

79 properties=None, 

80 **kwargs, 

81 ) -> None: 

82 properties = properties or {} 

83 

84 # Call parent class constructor 

85 super().__init__(properties) 

86 self.locals_var_dict = locals().copy() 

87 

88 # Input/Output files 

89 self.io_dict = { 

90 "in": { 

91 "input_filename_shift": input_filename_shift, 

92 "input_filename_slide": input_filename_slide, 

93 "input_filename_rise": input_filename_rise, 

94 "input_filename_tilt": input_filename_tilt, 

95 "input_filename_roll": input_filename_roll, 

96 "input_filename_twist": input_filename_twist, 

97 }, 

98 "out": { 

99 "output_csv_path": output_csv_path, 

100 "output_jpg_path": output_jpg_path, 

101 }, 

102 } 

103 

104 self.properties = properties 

105 self.sequence = properties.get("sequence", None) 

106 self.seqpos = [ 

107 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) 

108 ] 

109 

110 # Check the properties 

111 self.check_properties(properties) 

112 self.check_arguments() 

113 

114 @launchlogger 

115 def launch(self) -> int: 

116 """Execute the :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` object.""" 

117 

118 # Setup Biobb 

119 if self.check_restart(): 

120 return 0 

121 self.stage_files() 

122 

123 # check sequence 

124 if self.sequence is None or len(self.sequence) < 2: 

125 raise ValueError("sequence is null or too short!") 

126 

127 # check seqpos 

128 if self.seqpos: 

129 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

130 raise ValueError("seqpos must be a list of at least two integers") 

131 else: 

132 self.seqpos = None # type: ignore 

133 

134 # read input 

135 shift = read_series( 

136 self.stage_io_dict["in"]["input_filename_shift"], usecols=self.seqpos 

137 ) 

138 slide = read_series( 

139 self.stage_io_dict["in"]["input_filename_slide"], usecols=self.seqpos 

140 ) 

141 rise = read_series( 

142 self.stage_io_dict["in"]["input_filename_rise"], usecols=self.seqpos 

143 ) 

144 tilt = read_series( 

145 self.stage_io_dict["in"]["input_filename_tilt"], usecols=self.seqpos 

146 ) 

147 roll = read_series( 

148 self.stage_io_dict["in"]["input_filename_roll"], usecols=self.seqpos 

149 ) 

150 twist = read_series( 

151 self.stage_io_dict["in"]["input_filename_twist"], usecols=self.seqpos 

152 ) 

153 

154 if not self.seqpos: 

155 # drop first and last columns 

156 shift = shift[shift.columns[1:-2]] 

157 slide = slide[slide.columns[1:-2]] 

158 rise = rise[rise.columns[1:-2]] 

159 tilt = tilt[tilt.columns[1:-2]] 

160 roll = roll[roll.columns[1:-2]] 

161 twist = twist[twist.columns[1:-2]] 

162 labels = [ 

163 f"{i+1}_{self.sequence[i:i+2]}" 

164 for i in range(1, len(shift.columns) + 1) 

165 ] 

166 corr_index = [ 

167 f"{self.sequence[i:i+3]}" for i in range(1, len(shift.columns) + 1) 

168 ] 

169 else: 

170 labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos] 

171 corr_index = [f"{self.sequence[i:i+3]}" for i in self.seqpos] 

172 

173 # rename duplicated subunits 

174 shift.columns = labels 

175 slide.columns = labels 

176 rise.columns = labels 

177 tilt.columns = labels 

178 roll.columns = labels 

179 twist.columns = labels 

180 

181 # set names to each dataset 

182 shift.name = "shift" 

183 slide.name = "slide" 

184 rise.name = "rise" 

185 tilt.name = "tilt" 

186 roll.name = "roll" 

187 twist.name = "twist" 

188 

189 # get correlation between neighboring basepairs among all helical parameters 

190 results = {} 

191 datasets = [shift, slide, rise, tilt, roll, twist] 

192 for ser1, ser2 in product(datasets, datasets): 

193 ser2_shifted = ser2.shift(axis=1) 

194 ser2_shifted[labels[0]] = ser2[labels[-1]] 

195 if ser1.name in constants.hp_angular and ser2.name in constants.hp_angular: 

196 method = self.circular 

197 elif ( 

198 ser1.name in constants.hp_angular and ser2.name not in constants.hp_angular 

199 ) or ( 

200 ser2.name in constants.hp_angular and ser1.name not in constants.hp_angular 

201 ): 

202 method = self.circlineal 

203 else: 

204 method = "pearson" # type: ignore 

205 corr_data = ser1.corrwith(ser2_shifted, method=method) 

206 corr_data.index = corr_index 

207 results[f"{ser1.name}/{ser2.name}"] = corr_data 

208 result_df = pd.DataFrame.from_dict(results) 

209 result_df.index = corr_index # type: ignore 

210 

211 # save csv data 

212 result_df.to_csv(self.stage_io_dict["out"]["output_csv_path"]) 

213 

214 # create heatmap 

215 cmap = plt.get_cmap("bwr").copy() 

216 bounds = [-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1] 

217 num = cmap.N 

218 norm = mpl.colors.BoundaryNorm(bounds, num) # type: ignore 

219 cmap.set_bad(color="gainsboro") 

220 fig, ax = plt.subplots(1, 1, dpi=300, figsize=(7.5, 5), tight_layout=True) 

221 im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect="auto") 

222 plt.colorbar(im, ticks=[-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1]) 

223 

224 # axes 

225 xlocs = np.arange(len(result_df.columns)) 

226 _ = ax.set_xticks(xlocs) 

227 _ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90) 

228 

229 ylocs = np.arange(len(result_df.index)) 

230 _ = ax.set_yticks(ylocs) 

231 _ = ax.set_yticklabels(result_df.index.to_list()) # type: ignore 

232 

233 ax.set_title( 

234 "Correlation for neighboring basepairs " "and pairs of helical parameters" 

235 ) 

236 

237 fig.tight_layout() 

238 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") 

239 plt.close() 

240 

241 # Copy files to host 

242 self.copy_to_host() 

243 

244 # Remove temporary file(s) 

245 # self.tmp_files.extend([ 

246 # self.stage_io_dict.get("unique_dir", ""), 

247 # ]) 

248 self.remove_tmp_files() 

249 

250 self.check_arguments(output_files_created=True, raise_exception=False) 

251 

252 return 0 

253 

254 @staticmethod 

255 def circular(x1, x2): 

256 x1 = x1 * np.pi / 180 

257 x2 = x2 * np.pi / 180 

258 diff_1 = np.sin(x1 - x1.mean()) 

259 diff_2 = np.sin(x2 - x2.mean()) 

260 num = (diff_1 * diff_2).sum() 

261 den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum()) 

262 return num / den 

263 

264 @staticmethod 

265 def circlineal(x1, x2): 

266 x2 = x2 * np.pi / 180 

267 rc = np.corrcoef(x1, np.cos(x2))[1, 0] 

268 rs = np.corrcoef(x1, np.sin(x2))[1, 0] 

269 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0] 

270 num = (rc**2) + (rs**2) - 2 * rc * rs * rcs 

271 den = 1 - (rcs**2) 

272 correlation = np.sqrt(num / den) 

273 if np.corrcoef(x1, x2)[1, 0] < 0: 

274 correlation *= -1 

275 return correlation 

276 

277 

278def interbpcorr( 

279 input_filename_shift: str, 

280 input_filename_slide: str, 

281 input_filename_rise: str, 

282 input_filename_tilt: str, 

283 input_filename_roll: str, 

284 input_filename_twist: str, 

285 output_csv_path: str, 

286 output_jpg_path: str, 

287 properties: Optional[dict] = None, 

288 **kwargs, 

289) -> int: 

290 """Create :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` class and 

291 execute the :meth:`launch() <correlations.interbpcorr.InterBasePairCorrelation.launch>` method.""" 

292 

293 return InterBasePairCorrelation( 

294 input_filename_shift=input_filename_shift, 

295 input_filename_slide=input_filename_slide, 

296 input_filename_rise=input_filename_rise, 

297 input_filename_tilt=input_filename_tilt, 

298 input_filename_roll=input_filename_roll, 

299 input_filename_twist=input_filename_twist, 

300 output_csv_path=output_csv_path, 

301 output_jpg_path=output_jpg_path, 

302 properties=properties, 

303 **kwargs, 

304 ).launch() 

305 

306 interbpcorr.__doc__ = InterBasePairCorrelation.__doc__ 

307 

308 

309def main(): 

310 """Command line execution of this building block. Please check the command line documentation.""" 

311 parser = argparse.ArgumentParser( 

312 description="Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.", 

313 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), 

314 ) 

315 parser.add_argument("--config", required=False, help="Configuration file") 

316 

317 required_args = parser.add_argument_group("required arguments") 

318 required_args.add_argument( 

319 "--input_filename_shift", 

320 required=True, 

321 help="Path to ser file for helical parameter shift. Accepted formats: ser.", 

322 ) 

323 required_args.add_argument( 

324 "--input_filename_slide", 

325 required=True, 

326 help="Path to ser file for helical parameter slide. Accepted formats: ser.", 

327 ) 

328 required_args.add_argument( 

329 "--input_filename_rise", 

330 required=True, 

331 help="Path to ser file for helical parameter rise. Accepted formats: ser.", 

332 ) 

333 required_args.add_argument( 

334 "--input_filename_tilt", 

335 required=True, 

336 help="Path to ser file for helical parameter tilt. Accepted formats: ser.", 

337 ) 

338 required_args.add_argument( 

339 "--input_filename_roll", 

340 required=True, 

341 help="Path to ser file for helical parameter roll. Accepted formats: ser.", 

342 ) 

343 required_args.add_argument( 

344 "--input_filename_twist", 

345 required=True, 

346 help="Path to ser file for helical parameter twist. Accepted formats: ser.", 

347 ) 

348 required_args.add_argument( 

349 "--output_csv_path", 

350 required=True, 

351 help="Path to output file. Accepted formats: csv.", 

352 ) 

353 required_args.add_argument( 

354 "--output_jpg_path", 

355 required=True, 

356 help="Path to output plot. Accepted formats: jpg.", 

357 ) 

358 

359 args = parser.parse_args() 

360 args.config = args.config or "{}" 

361 properties = settings.ConfReader(config=args.config).get_prop_dic() 

362 

363 interbpcorr( 

364 input_filename_shift=args.input_filename_shift, 

365 input_filename_slide=args.input_filename_slide, 

366 input_filename_rise=args.input_filename_rise, 

367 input_filename_tilt=args.input_filename_tilt, 

368 input_filename_roll=args.input_filename_roll, 

369 input_filename_twist=args.input_filename_twist, 

370 output_csv_path=args.output_csv_path, 

371 output_jpg_path=args.output_jpg_path, 

372 properties=properties, 

373 ) 

374 

375 

376if __name__ == "__main__": 

377 main()