Coverage for biobb_dna/interbp_correlations/interbpcorr.py: 85%

144 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:06 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the InterBasePairCorrelation class and the command line interface.""" 

4import argparse 

5from itertools import product 

6 

7import numpy as np 

8import pandas as pd 

9import matplotlib as mpl 

10import matplotlib.pyplot as plt 

11 

12from biobb_common.generic.biobb_object import BiobbObject 

13from biobb_common.configuration import settings 

14from biobb_common.tools import file_utils as fu 

15from biobb_common.tools.file_utils import launchlogger 

16from biobb_dna.utils.loader import read_series 

17from biobb_dna.utils import constants 

18 

19 

20class InterBasePairCorrelation(BiobbObject): 

21 """ 

22 | biobb_dna InterBasePairCorrelation 

23 | Calculate correlation between all base pairs of a single sequence and for a single helical parameter. 

24 

25 Args: 

26 input_filename_shift (str): Path to .ser file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330). 

27 input_filename_slide (str): Path to .ser file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_slide.ser>`_. Accepted formats: ser (edam:format_2330). 

28 input_filename_rise (str): Path to .ser file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_rise.ser>`_. Accepted formats: ser (edam:format_2330). 

29 input_filename_tilt (str): Path to .ser file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_tilt.ser>`_. Accepted formats: ser (edam:format_2330). 

30 input_filename_roll (str): Path to .ser file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_roll.ser>`_. Accepted formats: ser (edam:format_2330). 

31 input_filename_twist (str): Path to .ser file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_twist.ser>`_. Accepted formats: ser (edam:format_2330). 

32 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752). 

33 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579). 

34 properties (dict): 

35 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

36 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

37 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

38 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

39 

40 Examples: 

41 This is a use example of how to use the building block from Python:: 

42 

43 from biobb_dna.interbp_correlations.interbpcorr import interbpcorr 

44 

45 interbpcorr( 

46 input_filename_shift='path/to/input/shift.ser', 

47 input_filename_slide='path/to/input/slide.ser', 

48 input_filename_rise='path/to/input/slide.ser', 

49 input_filename_tilt='path/to/input/tilt.ser', 

50 input_filename_roll='path/to/input/roll.ser', 

51 input_filename_twist='path/to/input/twist.ser', 

52 output_csv_path='path/to/output/file.csv', 

53 output_jpg_path='path/to/output/plot.jpg', 

54 properties=prop) 

55 Info: 

56 * wrapped_software: 

57 * name: In house 

58 * license: Apache-2.0 

59 * ontology: 

60 * name: EDAM 

61 * schema: http://edamontology.org/EDAM.owl 

62 

63 """ 

64 

65 def __init__( 

66 self, input_filename_shift, input_filename_slide, 

67 input_filename_rise, input_filename_tilt, 

68 input_filename_roll, input_filename_twist, 

69 output_csv_path, output_jpg_path, 

70 properties=None, **kwargs) -> None: 

71 properties = properties or {} 

72 

73 # Call parent class constructor 

74 super().__init__(properties) 

75 self.locals_var_dict = locals().copy() 

76 

77 # Input/Output files 

78 self.io_dict = { 

79 'in': { 

80 'input_filename_shift': input_filename_shift, 

81 'input_filename_slide': input_filename_slide, 

82 'input_filename_rise': input_filename_rise, 

83 'input_filename_tilt': input_filename_tilt, 

84 'input_filename_roll': input_filename_roll, 

85 'input_filename_twist': input_filename_twist 

86 }, 

87 'out': { 

88 'output_csv_path': output_csv_path, 

89 'output_jpg_path': output_jpg_path 

90 } 

91 } 

92 

93 self.properties = properties 

94 self.sequence = properties.get("sequence", None) 

95 self.seqpos = properties.get("seqpos", None) 

96 

97 # Check the properties 

98 self.check_properties(properties) 

99 self.check_arguments() 

100 

101 @launchlogger 

102 def launch(self) -> int: 

103 """Execute the :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` object.""" 

104 

105 # Setup Biobb 

106 if self.check_restart(): 

107 return 0 

108 self.stage_files() 

109 

110 # check sequence 

111 if self.sequence is None or len(self.sequence) < 2: 

112 raise ValueError("sequence is null or too short!") 

113 

114 # check seqpos 

115 if self.seqpos is not None: 

116 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

117 raise ValueError( 

118 "seqpos must be a list of at least two integers") 

119 

120 # Creating temporary folder 

121 self.tmp_folder = fu.create_unique_dir(prefix="bpcorrelation_") 

122 fu.log('Creating %s temporary folder' % self.tmp_folder, self.out_log) 

123 

124 # read input 

125 shift = read_series( 

126 self.io_dict["in"]["input_filename_shift"], usecols=self.seqpos) 

127 slide = read_series( 

128 self.io_dict["in"]["input_filename_slide"], usecols=self.seqpos) 

129 rise = read_series( 

130 self.io_dict["in"]["input_filename_rise"], usecols=self.seqpos) 

131 tilt = read_series( 

132 self.io_dict["in"]["input_filename_tilt"], usecols=self.seqpos) 

133 roll = read_series( 

134 self.io_dict["in"]["input_filename_roll"], usecols=self.seqpos) 

135 twist = read_series( 

136 self.io_dict["in"]["input_filename_twist"], usecols=self.seqpos) 

137 

138 if self.seqpos is None: 

139 # drop first and last columns 

140 shift = shift[shift.columns[1:-2]] 

141 slide = slide[slide.columns[1:-2]] 

142 rise = rise[rise.columns[1:-2]] 

143 tilt = tilt[tilt.columns[1:-2]] 

144 roll = roll[roll.columns[1:-2]] 

145 twist = twist[twist.columns[1:-2]] 

146 labels = [ 

147 f"{i+1}_{self.sequence[i:i+2]}" for i in range(1, len(shift.columns) + 1)] 

148 corr_index = [ 

149 f"{self.sequence[i:i+3]}" for i in range(1, len(shift.columns) + 1)] 

150 else: 

151 labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos] 

152 corr_index = [f"{self.sequence[i:i+3]}" for i in self.seqpos] 

153 

154 # rename duplicated subunits 

155 shift.columns = labels 

156 slide.columns = labels 

157 rise.columns = labels 

158 tilt.columns = labels 

159 roll.columns = labels 

160 twist.columns = labels 

161 

162 # set names to each dataset 

163 shift.name = "shift" 

164 slide.name = "slide" 

165 rise.name = "rise" 

166 tilt.name = "tilt" 

167 roll.name = "roll" 

168 twist.name = "twist" 

169 

170 # get correlation between neighboring basepairs among all helical parameters 

171 results = {} 

172 datasets = [shift, slide, rise, tilt, roll, twist] 

173 for ser1, ser2 in product(datasets, datasets): 

174 ser2_shifted = ser2.shift(axis=1) 

175 ser2_shifted[labels[0]] = ser2[labels[-1]] 

176 if ( 

177 ser1.name in constants.hp_angular and ser2.name in constants.hp_angular): 

178 method = self.circular 

179 elif ( 

180 ( 

181 ser1.name in constants.hp_angular and not ( 

182 ser2.name in constants.hp_angular) 

183 ) or ( 

184 ser2.name in constants.hp_angular and not ( 

185 ser1.name in constants.hp_angular) 

186 ) 

187 ): 

188 method = self.circlineal 

189 else: 

190 method = "pearson" 

191 corr_data = ser1.corrwith(ser2_shifted, method=method) 

192 corr_data.index = corr_index 

193 results[f"{ser1.name}/{ser2.name}"] = corr_data 

194 result_df = pd.DataFrame.from_dict(results) 

195 result_df.index = corr_index 

196 

197 # save csv data 

198 result_df.to_csv(self.io_dict["out"]["output_csv_path"]) 

199 

200 # create heatmap 

201 cmap = plt.get_cmap("bwr").copy() 

202 bounds = [-1, -.8, -.6, -.4, -.2, .2, .4, .6, .8, 1] 

203 num = cmap.N 

204 norm = mpl.colors.BoundaryNorm(bounds, num) 

205 cmap.set_bad(color="gainsboro") 

206 fig, ax = plt.subplots( 

207 1, 

208 1, 

209 dpi=300, 

210 figsize=(7.5, 5), 

211 tight_layout=True) 

212 im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect='auto') 

213 plt.colorbar(im, ticks=[-1, -.8, -.6, -.4, -.2, .2, .4, .6, .8, 1]) 

214 

215 # axes 

216 xlocs = np.arange(len(result_df.columns)) 

217 _ = ax.set_xticks(xlocs) 

218 _ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90) 

219 

220 ylocs = np.arange(len(result_df.index)) 

221 _ = ax.set_yticks(ylocs) 

222 _ = ax.set_yticklabels(result_df.index.to_list()) 

223 

224 ax.set_title( 

225 "Correlation for neighboring basepairs " 

226 "and pairs of helical parameters") 

227 

228 fig.tight_layout() 

229 fig.savefig( 

230 self.io_dict['out']['output_jpg_path'], 

231 format="jpg") 

232 plt.close() 

233 

234 # Remove temporary file(s) 

235 self.tmp_files.extend([ 

236 self.stage_io_dict.get("unique_dir"), 

237 self.tmp_folder 

238 ]) 

239 self.remove_tmp_files() 

240 

241 self.check_arguments(output_files_created=True, raise_exception=False) 

242 

243 return 0 

244 

245 @staticmethod 

246 def circular(x1, x2): 

247 x1 = x1 * np.pi / 180 

248 x2 = x2 * np.pi / 180 

249 diff_1 = np.sin(x1 - x1.mean()) 

250 diff_2 = np.sin(x2 - x2.mean()) 

251 num = (diff_1 * diff_2).sum() 

252 den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum()) 

253 return num / den 

254 

255 @staticmethod 

256 def circlineal(x1, x2): 

257 x2 = x2 * np.pi / 180 

258 rc = np.corrcoef(x1, np.cos(x2))[1, 0] 

259 rs = np.corrcoef(x1, np.sin(x2))[1, 0] 

260 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0] 

261 num = (rc ** 2) + (rs ** 2) - 2 * rc * rs * rcs 

262 den = 1 - (rcs ** 2) 

263 correlation = np.sqrt(num / den) 

264 if np.corrcoef(x1, x2)[1, 0] < 0: 

265 correlation *= -1 

266 return correlation 

267 

268 

269def interbpcorr( 

270 input_filename_shift: str, input_filename_slide: str, 

271 input_filename_rise: str, input_filename_tilt: str, 

272 input_filename_roll: str, input_filename_twist: str, 

273 output_csv_path: str, output_jpg_path: str, 

274 properties: dict = None, **kwargs) -> int: 

275 """Create :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` class and 

276 execute the :meth:`launch() <correlations.interbpcorr.InterBasePairCorrelation.launch>` method.""" 

277 

278 return InterBasePairCorrelation( 

279 input_filename_shift=input_filename_shift, 

280 input_filename_slide=input_filename_slide, 

281 input_filename_rise=input_filename_rise, 

282 input_filename_tilt=input_filename_tilt, 

283 input_filename_roll=input_filename_roll, 

284 input_filename_twist=input_filename_twist, 

285 output_csv_path=output_csv_path, 

286 output_jpg_path=output_jpg_path, 

287 properties=properties, **kwargs).launch() 

288 

289 

290def main(): 

291 """Command line execution of this building block. Please check the command line documentation.""" 

292 parser = argparse.ArgumentParser(description='Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.', 

293 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

294 parser.add_argument('--config', required=False, help='Configuration file') 

295 

296 required_args = parser.add_argument_group('required arguments') 

297 required_args.add_argument('--input_filename_shift', required=True, 

298 help='Path to ser file for helical parameter shift. Accepted formats: ser.') 

299 required_args.add_argument('--input_filename_slide', required=True, 

300 help='Path to ser file for helical parameter slide. Accepted formats: ser.') 

301 required_args.add_argument('--input_filename_rise', required=True, 

302 help='Path to ser file for helical parameter rise. Accepted formats: ser.') 

303 required_args.add_argument('--input_filename_tilt', required=True, 

304 help='Path to ser file for helical parameter tilt. Accepted formats: ser.') 

305 required_args.add_argument('--input_filename_roll', required=True, 

306 help='Path to ser file for helical parameter roll. Accepted formats: ser.') 

307 required_args.add_argument('--input_filename_twist', required=True, 

308 help='Path to ser file for helical parameter twist. Accepted formats: ser.') 

309 required_args.add_argument('--output_csv_path', required=True, 

310 help='Path to output file. Accepted formats: csv.') 

311 required_args.add_argument('--output_jpg_path', required=True, 

312 help='Path to output plot. Accepted formats: jpg.') 

313 

314 args = parser.parse_args() 

315 args.config = args.config or "{}" 

316 properties = settings.ConfReader(config=args.config).get_prop_dic() 

317 

318 interbpcorr( 

319 input_filename_shift=args.input_filename_shift, 

320 input_filename_slide=args.input_filename_slide, 

321 input_filename_rise=args.input_filename_rise, 

322 input_filename_tilt=args.input_filename_tilt, 

323 input_filename_roll=args.input_filename_roll, 

324 input_filename_twist=args.input_filename_twist, 

325 output_csv_path=args.output_csv_path, 

326 output_jpg_path=args.output_jpg_path, 

327 properties=properties) 

328 

329 

330if __name__ == '__main__': 

331 main()