Coverage for biobb_dna / intrabp_correlations / intrabpcorr.py: 94%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-15 18:49 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the IntraBasePairCorrelation class and the command line interface.""" 

4from itertools import product 

5from typing import Optional 

6 

7import matplotlib as mpl 

8import matplotlib.pyplot as plt 

9import numpy as np 

10import pandas as pd 

11from biobb_common.generic.biobb_object import BiobbObject 

12from biobb_common.tools.file_utils import launchlogger 

13 

14from biobb_dna.utils import constants 

15from biobb_dna.utils.common import _from_string_to_list 

16from biobb_dna.utils.loader import read_series 

17 

18 

19class IntraBasePairCorrelation(BiobbObject): 

20 """ 

21 | biobb_dna IntraBasePairCorrelation 

22 | Calculate correlation between all intra-base pairs of a single sequence and for a single helical parameter. 

23 | Calculate correlation between neighboring base pairs and pairs of helical parameters. 

24 

25 Args: 

26 input_filename_shear (str): Path to .ser file with data for helical parameter 'shear'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shear.ser>`_. Accepted formats: ser (edam:format_2330). 

27 input_filename_stretch (str): Path to .ser file with data for helical parameter 'stretch'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_stretch.ser>`_. Accepted formats: ser (edam:format_2330). 

28 input_filename_stagger (str): Path to .ser file with data for helical parameter 'stagger'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_stagger.ser>`_. Accepted formats: ser (edam:format_2330). 

29 input_filename_buckle (str): Path to .ser file with data for helical parameter 'buckle'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_buckle.ser>`_. Accepted formats: ser (edam:format_2330). 

30 input_filename_propel (str): Path to .ser file with data for helical parameter 'propel'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_propel.ser>`_. Accepted formats: ser (edam:format_2330). 

31 input_filename_opening (str): Path to .ser file with data for helical parameter 'opening'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_opening.ser>`_. Accepted formats: ser (edam:format_2330). 

32 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/intra_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752). 

33 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/intra_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579). 

34 properties (dict): 

35 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

36 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

37 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

38 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

39 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

40 

41 Examples: 

42 This is a use example of how to use the building block from Python:: 

43 

44 from biobb_dna.intrabp_correlations.intrabpcorr import intrabpcorr 

45 

46 intrabpcorr( 

47 input_filename_shear='path/to/input/shear.ser', 

48 input_filename_stretch='path/to/input/stretch.ser', 

49 input_filename_stagger='path/to/input/stagger.ser', 

50 input_filename_buckle='path/to/input/buckle.ser', 

51 input_filename_propel='path/to/input/propel.ser', 

52 input_filename_opening='path/to/input/opening.ser', 

53 output_csv_path='path/to/output/file.csv', 

54 output_jpg_path='path/to/output/plot.jpg', 

55 properties=prop) 

56 Info: 

57 * wrapped_software: 

58 * name: In house 

59 * license: Apache-2.0 

60 * ontology: 

61 * name: EDAM 

62 * schema: http://edamontology.org/EDAM.owl 

63 

64 """ 

65 

66 def __init__( 

67 self, 

68 input_filename_shear, 

69 input_filename_stretch, 

70 input_filename_stagger, 

71 input_filename_buckle, 

72 input_filename_propel, 

73 input_filename_opening, 

74 output_csv_path, 

75 output_jpg_path, 

76 properties=None, 

77 **kwargs, 

78 ) -> None: 

79 properties = properties or {} 

80 

81 # Call parent class constructor 

82 super().__init__(properties) 

83 self.locals_var_dict = locals().copy() 

84 

85 # Input/Output files 

86 self.io_dict = { 

87 "in": { 

88 "input_filename_shear": input_filename_shear, 

89 "input_filename_stretch": input_filename_stretch, 

90 "input_filename_stagger": input_filename_stagger, 

91 "input_filename_buckle": input_filename_buckle, 

92 "input_filename_propel": input_filename_propel, 

93 "input_filename_opening": input_filename_opening, 

94 }, 

95 "out": { 

96 "output_csv_path": output_csv_path, 

97 "output_jpg_path": output_jpg_path, 

98 }, 

99 } 

100 

101 self.properties = properties 

102 self.sequence = properties.get("sequence", None) 

103 self.seqpos = [ 

104 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) 

105 ] 

106 

107 # Check the properties 

108 self.check_properties(properties) 

109 self.check_arguments() 

110 

111 @launchlogger 

112 def launch(self) -> int: 

113 """Execute the :class:`HelParCorrelation <intrabp_correlations.intrabpcorr.IntraBasePairCorrelation>` object.""" 

114 

115 # Setup Biobb 

116 if self.check_restart(): 

117 return 0 

118 self.stage_files() 

119 

120 # check sequence 

121 if self.sequence is None or len(self.sequence) < 2: 

122 raise ValueError("sequence is null or too short!") 

123 

124 # check seqpos 

125 if self.seqpos: 

126 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

127 raise ValueError("seqpos must be a list of at least two integers") 

128 else: 

129 self.seqpos = None # type: ignore 

130 

131 # read input 

132 shear = read_series( 

133 self.stage_io_dict["in"]["input_filename_shear"], usecols=self.seqpos 

134 ) 

135 stretch = read_series( 

136 self.stage_io_dict["in"]["input_filename_stretch"], usecols=self.seqpos 

137 ) 

138 stagger = read_series( 

139 self.stage_io_dict["in"]["input_filename_stagger"], usecols=self.seqpos 

140 ) 

141 buckle = read_series( 

142 self.stage_io_dict["in"]["input_filename_buckle"], usecols=self.seqpos 

143 ) 

144 propel = read_series( 

145 self.stage_io_dict["in"]["input_filename_propel"], usecols=self.seqpos 

146 ) 

147 opening = read_series( 

148 self.stage_io_dict["in"]["input_filename_opening"], usecols=self.seqpos 

149 ) 

150 

151 if not self.seqpos: 

152 # drop first and last columns 

153 shear = shear[shear.columns[1:-1]] 

154 stretch = stretch[stretch.columns[1:-1]] 

155 stagger = stagger[stagger.columns[1:-1]] 

156 buckle = buckle[buckle.columns[1:-1]] 

157 propel = propel[propel.columns[1:-1]] 

158 opening = opening[opening.columns[1:-1]] 

159 labels = [ 

160 f"{i+1}_{self.sequence[i:i+1]}" 

161 for i in range(1, len(shear.columns) + 1) 

162 ] 

163 corr_index = [ 

164 f"{self.sequence[i:i+2]}" for i in range(1, len(shear.columns) + 1) 

165 ] 

166 else: 

167 labels = [f"{i+1}_{self.sequence[i:i+1]}" for i in self.seqpos] 

168 corr_index = [f"{self.sequence[i:i+2]}" for i in self.seqpos] 

169 

170 # rename duplicated subunits 

171 shear.columns = labels 

172 stretch.columns = labels 

173 stagger.columns = labels 

174 buckle.columns = labels 

175 propel.columns = labels 

176 opening.columns = labels 

177 

178 # set names to each dataset 

179 shear.name = "shear" 

180 stretch.name = "stretch" 

181 stagger.name = "stagger" 

182 buckle.name = "buckle" 

183 propel.name = "propel" 

184 opening.name = "opening" 

185 

186 # get correlation between neighboring basepairs among all helical parameters 

187 results = {} 

188 datasets = [shear, stretch, stagger, buckle, propel, opening] 

189 for ser1, ser2 in product(datasets, datasets): 

190 ser2_shifted = ser2.shift(axis=1) 

191 ser2_shifted[labels[0]] = ser2[labels[-1]] 

192 if ser1.name in constants.hp_angular and ser2.name in constants.hp_angular: 

193 method = self.circular 

194 elif ( 

195 ser1.name in constants.hp_angular and ser2.name not in constants.hp_angular 

196 ) or ( 

197 ser2.name in constants.hp_angular and ser1.name not in constants.hp_angular 

198 ): 

199 method = self.circlineal 

200 else: 

201 method = "pearson" # type: ignore 

202 corr_data = ser1.corrwith(ser2_shifted, method=method) 

203 corr_data.index = corr_index 

204 results[f"{ser1.name}/{ser2.name}"] = corr_data 

205 result_df = pd.DataFrame.from_dict(results) 

206 result_df.index = corr_index # type: ignore 

207 

208 # save csv data 

209 result_df.to_csv(self.stage_io_dict["out"]["output_csv_path"]) 

210 

211 # create heatmap 

212 cmap = plt.get_cmap("bwr").copy() 

213 bounds = [-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1] 

214 num = cmap.N 

215 norm = mpl.colors.BoundaryNorm(bounds, num) # type: ignore 

216 cmap.set_bad(color="gainsboro") 

217 fig, ax = plt.subplots(1, 1, dpi=300, figsize=(7.5, 5), tight_layout=True) 

218 im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect="auto") 

219 plt.colorbar(im, ticks=[-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1]) 

220 

221 # axes 

222 xlocs = np.arange(len(result_df.columns)) 

223 _ = ax.set_xticks(xlocs) 

224 _ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90) 

225 

226 ylocs = np.arange(len(result_df.index)) 

227 _ = ax.set_yticks(ylocs) 

228 _ = ax.set_yticklabels(result_df.index.to_list()) # type: ignore 

229 

230 ax.set_title( 

231 "Correlation for neighboring basepairs " "and pairs of helical parameters" 

232 ) 

233 

234 fig.tight_layout() 

235 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") 

236 plt.close() 

237 

238 # Copy files to host 

239 self.copy_to_host() 

240 

241 # Remove temporary file(s) 

242 self.remove_tmp_files() 

243 

244 self.check_arguments(output_files_created=True, raise_exception=False) 

245 

246 return 0 

247 

248 @staticmethod 

249 def circular(x1, x2): 

250 x1 = x1 * np.pi / 180 

251 x2 = x2 * np.pi / 180 

252 diff_1 = np.sin(x1 - x1.mean()) 

253 diff_2 = np.sin(x2 - x2.mean()) 

254 num = (diff_1 * diff_2).sum() 

255 den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum()) 

256 return num / den 

257 

258 @staticmethod 

259 def circlineal(x1, x2): 

260 x2 = x2 * np.pi / 180 

261 rc = np.corrcoef(x1, np.cos(x2))[1, 0] 

262 rs = np.corrcoef(x1, np.sin(x2))[1, 0] 

263 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0] 

264 num = (rc**2) + (rs**2) - 2 * rc * rs * rcs 

265 den = 1 - (rcs**2) 

266 correlation = np.sqrt(num / den) 

267 if np.corrcoef(x1, x2)[1, 0] < 0: 

268 correlation *= -1 

269 return correlation 

270 

271 

272def intrabpcorr( 

273 input_filename_shear: str, 

274 input_filename_stretch: str, 

275 input_filename_stagger: str, 

276 input_filename_buckle: str, 

277 input_filename_propel: str, 

278 input_filename_opening: str, 

279 output_csv_path: str, 

280 output_jpg_path: str, 

281 properties: Optional[dict] = None, 

282 **kwargs, 

283) -> int: 

284 """Create :class:`HelParCorrelation <intrabp_correlations.intrabpcorr.IntraBasePairCorrelation>` class and 

285 execute the :meth:`launch() <intrabp_correlations.intrabpcorr.IntraBasePairCorrelation.launch>` method.""" 

286 return IntraBasePairCorrelation(**dict(locals())).launch() 

287 

288 

289intrabpcorr.__doc__ = IntraBasePairCorrelation.__doc__ 

290main = IntraBasePairCorrelation.get_main(intrabpcorr, "Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.") 

291 

292if __name__ == '__main__': 

293 main()