Coverage for biobb_dna/intrabp_correlations/intrabpcorr.py: 84%

144 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 10:36 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the IntraBasePairCorrelation class and the command line interface.""" 

4 

5import argparse 

6from itertools import product 

7from typing import Optional 

8 

9import matplotlib as mpl 

10import matplotlib.pyplot as plt 

11import numpy as np 

12import pandas as pd 

13from biobb_common.configuration import settings 

14from biobb_common.generic.biobb_object import BiobbObject 

15from biobb_common.tools.file_utils import launchlogger 

16 

17from biobb_dna.utils import constants 

18from biobb_dna.utils.common import _from_string_to_list 

19from biobb_dna.utils.loader import read_series 

20 

21 

22class IntraBasePairCorrelation(BiobbObject): 

23 """ 

24 | biobb_dna IntraBasePairCorrelation 

25 | Calculate correlation between all intra-base pairs of a single sequence and for a single helical parameter. 

26 | Calculate correlation between neighboring base pairs and pairs of helical parameters. 

27 

28 Args: 

29 input_filename_shear (str): Path to .ser file with data for helical parameter 'shear'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shear.ser>`_. Accepted formats: ser (edam:format_2330). 

30 input_filename_stretch (str): Path to .ser file with data for helical parameter 'stretch'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_stretch.ser>`_. Accepted formats: ser (edam:format_2330). 

31 input_filename_stagger (str): Path to .ser file with data for helical parameter 'stagger'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_stagger.ser>`_. Accepted formats: ser (edam:format_2330). 

32 input_filename_buckle (str): Path to .ser file with data for helical parameter 'buckle'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_buckle.ser>`_. Accepted formats: ser (edam:format_2330). 

33 input_filename_propel (str): Path to .ser file with data for helical parameter 'propel'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_propel.ser>`_. Accepted formats: ser (edam:format_2330). 

34 input_filename_opening (str): Path to .ser file with data for helical parameter 'opening'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_opening.ser>`_. Accepted formats: ser (edam:format_2330). 

35 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/intra_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752). 

36 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/intra_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579). 

37 properties (dict): 

38 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

39 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

40 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

41 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

42 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

43 

44 Examples: 

45 This is a use example of how to use the building block from Python:: 

46 

47 from biobb_dna.intrabp_correlations.intrabpcorr import intrabpcorr 

48 

49 intrabpcorr( 

50 input_filename_shear='path/to/input/shear.ser', 

51 input_filename_stretch='path/to/input/stretch.ser', 

52 input_filename_stagger='path/to/input/stagger.ser', 

53 input_filename_buckle='path/to/input/buckle.ser', 

54 input_filename_propel='path/to/input/propel.ser', 

55 input_filename_opening='path/to/input/opening.ser', 

56 output_csv_path='path/to/output/file.csv', 

57 output_jpg_path='path/to/output/plot.jpg', 

58 properties=prop) 

59 Info: 

60 * wrapped_software: 

61 * name: In house 

62 * license: Apache-2.0 

63 * ontology: 

64 * name: EDAM 

65 * schema: http://edamontology.org/EDAM.owl 

66 

67 """ 

68 

69 def __init__( 

70 self, 

71 input_filename_shear, 

72 input_filename_stretch, 

73 input_filename_stagger, 

74 input_filename_buckle, 

75 input_filename_propel, 

76 input_filename_opening, 

77 output_csv_path, 

78 output_jpg_path, 

79 properties=None, 

80 **kwargs, 

81 ) -> None: 

82 properties = properties or {} 

83 

84 # Call parent class constructor 

85 super().__init__(properties) 

86 self.locals_var_dict = locals().copy() 

87 

88 # Input/Output files 

89 self.io_dict = { 

90 "in": { 

91 "input_filename_shear": input_filename_shear, 

92 "input_filename_stretch": input_filename_stretch, 

93 "input_filename_stagger": input_filename_stagger, 

94 "input_filename_buckle": input_filename_buckle, 

95 "input_filename_propel": input_filename_propel, 

96 "input_filename_opening": input_filename_opening, 

97 }, 

98 "out": { 

99 "output_csv_path": output_csv_path, 

100 "output_jpg_path": output_jpg_path, 

101 }, 

102 } 

103 

104 self.properties = properties 

105 self.sequence = properties.get("sequence", None) 

106 self.seqpos = [ 

107 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) 

108 ] 

109 

110 # Check the properties 

111 self.check_properties(properties) 

112 self.check_arguments() 

113 

114 @launchlogger 

115 def launch(self) -> int: 

116 """Execute the :class:`HelParCorrelation <intrabp_correlations.intrabpcorr.IntraBasePairCorrelation>` object.""" 

117 

118 # Setup Biobb 

119 if self.check_restart(): 

120 return 0 

121 self.stage_files() 

122 

123 # check sequence 

124 if self.sequence is None or len(self.sequence) < 2: 

125 raise ValueError("sequence is null or too short!") 

126 

127 # check seqpos 

128 if self.seqpos: 

129 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

130 raise ValueError("seqpos must be a list of at least two integers") 

131 else: 

132 self.seqpos = None # type: ignore 

133 

134 # read input 

135 shear = read_series( 

136 self.stage_io_dict["in"]["input_filename_shear"], usecols=self.seqpos 

137 ) 

138 stretch = read_series( 

139 self.stage_io_dict["in"]["input_filename_stretch"], usecols=self.seqpos 

140 ) 

141 stagger = read_series( 

142 self.stage_io_dict["in"]["input_filename_stagger"], usecols=self.seqpos 

143 ) 

144 buckle = read_series( 

145 self.stage_io_dict["in"]["input_filename_buckle"], usecols=self.seqpos 

146 ) 

147 propel = read_series( 

148 self.stage_io_dict["in"]["input_filename_propel"], usecols=self.seqpos 

149 ) 

150 opening = read_series( 

151 self.stage_io_dict["in"]["input_filename_opening"], usecols=self.seqpos 

152 ) 

153 

154 if not self.seqpos: 

155 # drop first and last columns 

156 shear = shear[shear.columns[1:-1]] 

157 stretch = stretch[stretch.columns[1:-1]] 

158 stagger = stagger[stagger.columns[1:-1]] 

159 buckle = buckle[buckle.columns[1:-1]] 

160 propel = propel[propel.columns[1:-1]] 

161 opening = opening[opening.columns[1:-1]] 

162 labels = [ 

163 f"{i+1}_{self.sequence[i:i+1]}" 

164 for i in range(1, len(shear.columns) + 1) 

165 ] 

166 corr_index = [ 

167 f"{self.sequence[i:i+2]}" for i in range(1, len(shear.columns) + 1) 

168 ] 

169 else: 

170 labels = [f"{i+1}_{self.sequence[i:i+1]}" for i in self.seqpos] 

171 corr_index = [f"{self.sequence[i:i+2]}" for i in self.seqpos] 

172 

173 # rename duplicated subunits 

174 shear.columns = labels 

175 stretch.columns = labels 

176 stagger.columns = labels 

177 buckle.columns = labels 

178 propel.columns = labels 

179 opening.columns = labels 

180 

181 # set names to each dataset 

182 shear.name = "shear" 

183 stretch.name = "stretch" 

184 stagger.name = "stagger" 

185 buckle.name = "buckle" 

186 propel.name = "propel" 

187 opening.name = "opening" 

188 

189 # get correlation between neighboring basepairs among all helical parameters 

190 results = {} 

191 datasets = [shear, stretch, stagger, buckle, propel, opening] 

192 for ser1, ser2 in product(datasets, datasets): 

193 ser2_shifted = ser2.shift(axis=1) 

194 ser2_shifted[labels[0]] = ser2[labels[-1]] 

195 if ser1.name in constants.hp_angular and ser2.name in constants.hp_angular: 

196 method = self.circular 

197 elif ( 

198 ser1.name in constants.hp_angular and ser2.name not in constants.hp_angular 

199 ) or ( 

200 ser2.name in constants.hp_angular and ser1.name not in constants.hp_angular 

201 ): 

202 method = self.circlineal 

203 else: 

204 method = "pearson" # type: ignore 

205 corr_data = ser1.corrwith(ser2_shifted, method=method) 

206 corr_data.index = corr_index 

207 results[f"{ser1.name}/{ser2.name}"] = corr_data 

208 result_df = pd.DataFrame.from_dict(results) 

209 result_df.index = corr_index # type: ignore 

210 

211 # save csv data 

212 result_df.to_csv(self.stage_io_dict["out"]["output_csv_path"]) 

213 

214 # create heatmap 

215 cmap = plt.get_cmap("bwr").copy() 

216 bounds = [-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1] 

217 num = cmap.N 

218 norm = mpl.colors.BoundaryNorm(bounds, num) # type: ignore 

219 cmap.set_bad(color="gainsboro") 

220 fig, ax = plt.subplots(1, 1, dpi=300, figsize=(7.5, 5), tight_layout=True) 

221 im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect="auto") 

222 plt.colorbar(im, ticks=[-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1]) 

223 

224 # axes 

225 xlocs = np.arange(len(result_df.columns)) 

226 _ = ax.set_xticks(xlocs) 

227 _ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90) 

228 

229 ylocs = np.arange(len(result_df.index)) 

230 _ = ax.set_yticks(ylocs) 

231 _ = ax.set_yticklabels(result_df.index.to_list()) # type: ignore 

232 

233 ax.set_title( 

234 "Correlation for neighboring basepairs " "and pairs of helical parameters" 

235 ) 

236 

237 fig.tight_layout() 

238 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") 

239 plt.close() 

240 

241 # Copy files to host 

242 self.copy_to_host() 

243 

244 # Remove temporary file(s) 

245 # self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) 

246 self.remove_tmp_files() 

247 

248 self.check_arguments(output_files_created=True, raise_exception=False) 

249 

250 return 0 

251 

252 @staticmethod 

253 def circular(x1, x2): 

254 x1 = x1 * np.pi / 180 

255 x2 = x2 * np.pi / 180 

256 diff_1 = np.sin(x1 - x1.mean()) 

257 diff_2 = np.sin(x2 - x2.mean()) 

258 num = (diff_1 * diff_2).sum() 

259 den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum()) 

260 return num / den 

261 

262 @staticmethod 

263 def circlineal(x1, x2): 

264 x2 = x2 * np.pi / 180 

265 rc = np.corrcoef(x1, np.cos(x2))[1, 0] 

266 rs = np.corrcoef(x1, np.sin(x2))[1, 0] 

267 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0] 

268 num = (rc**2) + (rs**2) - 2 * rc * rs * rcs 

269 den = 1 - (rcs**2) 

270 correlation = np.sqrt(num / den) 

271 if np.corrcoef(x1, x2)[1, 0] < 0: 

272 correlation *= -1 

273 return correlation 

274 

275 

276def intrabpcorr( 

277 input_filename_shear: str, 

278 input_filename_stretch: str, 

279 input_filename_stagger: str, 

280 input_filename_buckle: str, 

281 input_filename_propel: str, 

282 input_filename_opening: str, 

283 output_csv_path: str, 

284 output_jpg_path: str, 

285 properties: Optional[dict] = None, 

286 **kwargs, 

287) -> int: 

288 """Create :class:`HelParCorrelation <intrabp_correlations.intrabpcorr.IntraBasePairCorrelation>` class and 

289 execute the :meth:`launch() <intrabp_correlations.intrabpcorr.IntraBasePairCorrelation.launch>` method.""" 

290 

291 return IntraBasePairCorrelation( 

292 input_filename_shear=input_filename_shear, 

293 input_filename_stretch=input_filename_stretch, 

294 input_filename_stagger=input_filename_stagger, 

295 input_filename_buckle=input_filename_buckle, 

296 input_filename_propel=input_filename_propel, 

297 input_filename_opening=input_filename_opening, 

298 output_csv_path=output_csv_path, 

299 output_jpg_path=output_jpg_path, 

300 properties=properties, 

301 **kwargs, 

302 ).launch() 

303 

304 intrabpcorr.__doc__ = IntraBasePairCorrelation.__doc__ 

305 

306 

307def main(): 

308 """Command line execution of this building block. Please check the command line documentation.""" 

309 parser = argparse.ArgumentParser( 

310 description="Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.", 

311 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), 

312 ) 

313 parser.add_argument("--config", required=False, help="Configuration file") 

314 

315 required_args = parser.add_argument_group("required arguments") 

316 required_args.add_argument( 

317 "--input_filename_shear", 

318 required=True, 

319 help="Path to ser file for helical parameter shear. Accepted formats: ser.", 

320 ) 

321 required_args.add_argument( 

322 "--input_filename_stretch", 

323 required=True, 

324 help="Path to ser file for helical parameter stretch. Accepted formats: ser.", 

325 ) 

326 required_args.add_argument( 

327 "--input_filename_stagger", 

328 required=True, 

329 help="Path to ser file for helical parameter stagger. Accepted formats: ser.", 

330 ) 

331 required_args.add_argument( 

332 "--input_filename_buckle", 

333 required=True, 

334 help="Path to ser file for helical parameter buckle. Accepted formats: ser.", 

335 ) 

336 required_args.add_argument( 

337 "--input_filename_propel", 

338 required=True, 

339 help="Path to ser file for helical parameter propel. Accepted formats: ser.", 

340 ) 

341 required_args.add_argument( 

342 "--input_filename_opening", 

343 required=True, 

344 help="Path to ser file for helical parameter opening. Accepted formats: ser.", 

345 ) 

346 required_args.add_argument( 

347 "--output_csv_path", 

348 required=True, 

349 help="Path to output file. Accepted formats: csv.", 

350 ) 

351 required_args.add_argument( 

352 "--output_jpg_path", 

353 required=True, 

354 help="Path to output plot. Accepted formats: jpg.", 

355 ) 

356 

357 args = parser.parse_args() 

358 args.config = args.config or "{}" 

359 properties = settings.ConfReader(config=args.config).get_prop_dic() 

360 

361 intrabpcorr( 

362 input_filename_shear=args.input_filename_shear, 

363 input_filename_stretch=args.input_filename_stretch, 

364 input_filename_stagger=args.input_filename_stagger, 

365 input_filename_buckle=args.input_filename_buckle, 

366 input_filename_propel=args.input_filename_propel, 

367 input_filename_opening=args.input_filename_opening, 

368 output_csv_path=args.output_csv_path, 

369 output_jpg_path=args.output_jpg_path, 

370 properties=properties, 

371 ) 

372 

373 

374if __name__ == "__main__": 

375 main()