Coverage for biobb_dna/intrabp_correlations/intrabpcorr.py: 85%

144 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:06 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the IntraBasePairCorrelation class and the command line interface.""" 

4import argparse 

5from itertools import product 

6 

7import numpy as np 

8import pandas as pd 

9import matplotlib as mpl 

10import matplotlib.pyplot as plt 

11 

12from biobb_common.generic.biobb_object import BiobbObject 

13from biobb_common.configuration import settings 

14from biobb_common.tools import file_utils as fu 

15from biobb_common.tools.file_utils import launchlogger 

16from biobb_dna.utils.loader import read_series 

17from biobb_dna.utils import constants 

18 

19 

20class IntraBasePairCorrelation(BiobbObject): 

21 """ 

22 | biobb_dna IntraBasePairCorrelation 

23 | Calculate correlation between all intra-base pairs of a single sequence and for a single helical parameter. 

24 

25 Args: 

26 input_filename_shear (str): Path to .ser file with data for helical parameter 'shear'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shear.ser>`_. Accepted formats: ser (edam:format_2330). 

27 input_filename_stretch (str): Path to .ser file with data for helical parameter 'stretch'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_stretch.ser>`_. Accepted formats: ser (edam:format_2330). 

28 input_filename_stagger (str): Path to .ser file with data for helical parameter 'stagger'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_stagger.ser>`_. Accepted formats: ser (edam:format_2330). 

29 input_filename_buckle (str): Path to .ser file with data for helical parameter 'buckle'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_buckle.ser>`_. Accepted formats: ser (edam:format_2330). 

30 input_filename_propel (str): Path to .ser file with data for helical parameter 'propel'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_propel.ser>`_. Accepted formats: ser (edam:format_2330). 

31 input_filename_opening (str): Path to .ser file with data for helical parameter 'opening'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_opening.ser>`_. Accepted formats: ser (edam:format_2330). 

32 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/intra_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752). 

33 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/intra_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579). 

34 properties (dict): 

35 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

36 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

37 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

38 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

39 

40 Examples: 

41 This is a use example of how to use the building block from Python:: 

42 

43 from biobb_dna.intrabp_correlations.intrabpcorr import intrabpcorr 

44 

45 intrabpcorr( 

46 input_filename_shear='path/to/input/shear.ser', 

47 input_filename_stretch='path/to/input/stretch.ser', 

48 input_filename_stagger='path/to/input/stagger.ser', 

49 input_filename_buckle='path/to/input/buckle.ser', 

50 input_filename_propel='path/to/input/propel.ser', 

51 input_filename_opening='path/to/input/opening.ser', 

52 output_csv_path='path/to/output/file.csv', 

53 output_jpg_path='path/to/output/plot.jpg', 

54 properties=prop) 

55 Info: 

56 * wrapped_software: 

57 * name: In house 

58 * license: Apache-2.0 

59 * ontology: 

60 * name: EDAM 

61 * schema: http://edamontology.org/EDAM.owl 

62 

63 """ 

64 

65 def __init__( 

66 self, input_filename_shear, input_filename_stretch, 

67 input_filename_stagger, input_filename_buckle, 

68 input_filename_propel, input_filename_opening, 

69 output_csv_path, output_jpg_path, 

70 properties=None, **kwargs) -> None: 

71 properties = properties or {} 

72 

73 # Call parent class constructor 

74 super().__init__(properties) 

75 self.locals_var_dict = locals().copy() 

76 

77 # Input/Output files 

78 self.io_dict = { 

79 'in': { 

80 'input_filename_shear': input_filename_shear, 

81 'input_filename_stretch': input_filename_stretch, 

82 'input_filename_stagger': input_filename_stagger, 

83 'input_filename_buckle': input_filename_buckle, 

84 'input_filename_propel': input_filename_propel, 

85 'input_filename_opening': input_filename_opening 

86 }, 

87 'out': { 

88 'output_csv_path': output_csv_path, 

89 'output_jpg_path': output_jpg_path 

90 } 

91 } 

92 

93 self.properties = properties 

94 self.sequence = properties.get("sequence", None) 

95 self.seqpos = properties.get("seqpos", None) 

96 

97 # Check the properties 

98 self.check_properties(properties) 

99 self.check_arguments() 

100 

101 @launchlogger 

102 def launch(self) -> int: 

103 """Execute the :class:`HelParCorrelation <intrabp_correlations.intrabpcorr.IntraBasePairCorrelation>` object.""" 

104 

105 # Setup Biobb 

106 if self.check_restart(): 

107 return 0 

108 self.stage_files() 

109 

110 # check sequence 

111 if self.sequence is None or len(self.sequence) < 2: 

112 raise ValueError("sequence is null or too short!") 

113 

114 # check seqpos 

115 if self.seqpos is not None: 

116 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

117 raise ValueError( 

118 "seqpos must be a list of at least two integers") 

119 

120 # Creating temporary folder 

121 self.tmp_folder = fu.create_unique_dir(prefix="bpcorrelation_") 

122 fu.log('Creating %s temporary folder' % self.tmp_folder, self.out_log) 

123 

124 # read input 

125 shear = read_series( 

126 self.io_dict["in"]["input_filename_shear"], usecols=self.seqpos) 

127 stretch = read_series( 

128 self.io_dict["in"]["input_filename_stretch"], usecols=self.seqpos) 

129 stagger = read_series( 

130 self.io_dict["in"]["input_filename_stagger"], usecols=self.seqpos) 

131 buckle = read_series( 

132 self.io_dict["in"]["input_filename_buckle"], usecols=self.seqpos) 

133 propel = read_series( 

134 self.io_dict["in"]["input_filename_propel"], usecols=self.seqpos) 

135 opening = read_series( 

136 self.io_dict["in"]["input_filename_opening"], usecols=self.seqpos) 

137 

138 if self.seqpos is None: 

139 # drop first and last columns 

140 shear = shear[shear.columns[1:-1]] 

141 stretch = stretch[stretch.columns[1:-1]] 

142 stagger = stagger[stagger.columns[1:-1]] 

143 buckle = buckle[buckle.columns[1:-1]] 

144 propel = propel[propel.columns[1:-1]] 

145 opening = opening[opening.columns[1:-1]] 

146 labels = [ 

147 f"{i+1}_{self.sequence[i:i+1]}" for i in range(1, len(shear.columns) + 1)] 

148 corr_index = [ 

149 f"{self.sequence[i:i+2]}" for i in range(1, len(shear.columns) + 1)] 

150 else: 

151 labels = [f"{i+1}_{self.sequence[i:i+1]}" for i in self.seqpos] 

152 corr_index = [f"{self.sequence[i:i+2]}" for i in self.seqpos] 

153 

154 # rename duplicated subunits 

155 shear.columns = labels 

156 stretch.columns = labels 

157 stagger.columns = labels 

158 buckle.columns = labels 

159 propel.columns = labels 

160 opening.columns = labels 

161 

162 # set names to each dataset 

163 shear.name = "shear" 

164 stretch.name = "stretch" 

165 stagger.name = "stagger" 

166 buckle.name = "buckle" 

167 propel.name = "propel" 

168 opening.name = "opening" 

169 

170 # get correlation between neighboring basepairs among all helical parameters 

171 results = {} 

172 datasets = [shear, stretch, stagger, buckle, propel, opening] 

173 for ser1, ser2 in product(datasets, datasets): 

174 ser2_shifted = ser2.shift(axis=1) 

175 ser2_shifted[labels[0]] = ser2[labels[-1]] 

176 if ( 

177 ser1.name in constants.hp_angular and ser2.name in constants.hp_angular): 

178 method = self.circular 

179 elif ( 

180 ( 

181 ser1.name in constants.hp_angular and not ( 

182 ser2.name in constants.hp_angular) 

183 ) or ( 

184 ser2.name in constants.hp_angular and not ( 

185 ser1.name in constants.hp_angular) 

186 ) 

187 ): 

188 method = self.circlineal 

189 else: 

190 method = "pearson" 

191 corr_data = ser1.corrwith(ser2_shifted, method=method) 

192 corr_data.index = corr_index 

193 results[f"{ser1.name}/{ser2.name}"] = corr_data 

194 result_df = pd.DataFrame.from_dict(results) 

195 result_df.index = corr_index 

196 

197 # save csv data 

198 result_df.to_csv(self.io_dict["out"]["output_csv_path"]) 

199 

200 # create heatmap 

201 cmap = plt.get_cmap("bwr").copy() 

202 bounds = [-1, -.8, -.6, -.4, -.2, .2, .4, .6, .8, 1] 

203 num = cmap.N 

204 norm = mpl.colors.BoundaryNorm(bounds, num) 

205 cmap.set_bad(color="gainsboro") 

206 fig, ax = plt.subplots( 

207 1, 

208 1, 

209 dpi=300, 

210 figsize=(7.5, 5), 

211 tight_layout=True) 

212 im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect='auto') 

213 plt.colorbar(im, ticks=[-1, -.8, -.6, -.4, -.2, .2, .4, .6, .8, 1]) 

214 

215 # axes 

216 xlocs = np.arange(len(result_df.columns)) 

217 _ = ax.set_xticks(xlocs) 

218 _ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90) 

219 

220 ylocs = np.arange(len(result_df.index)) 

221 _ = ax.set_yticks(ylocs) 

222 _ = ax.set_yticklabels(result_df.index.to_list()) 

223 

224 ax.set_title( 

225 "Correlation for neighboring basepairs " 

226 "and pairs of helical parameters") 

227 

228 fig.tight_layout() 

229 fig.savefig( 

230 self.io_dict['out']['output_jpg_path'], 

231 format="jpg") 

232 plt.close() 

233 

234 # Remove temporary file(s) 

235 self.tmp_files.extend([ 

236 self.stage_io_dict.get("unique_dir"), 

237 self.tmp_folder 

238 ]) 

239 self.remove_tmp_files() 

240 

241 self.check_arguments(output_files_created=True, raise_exception=False) 

242 

243 return 0 

244 

245 @staticmethod 

246 def circular(x1, x2): 

247 x1 = x1 * np.pi / 180 

248 x2 = x2 * np.pi / 180 

249 diff_1 = np.sin(x1 - x1.mean()) 

250 diff_2 = np.sin(x2 - x2.mean()) 

251 num = (diff_1 * diff_2).sum() 

252 den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum()) 

253 return num / den 

254 

255 @staticmethod 

256 def circlineal(x1, x2): 

257 x2 = x2 * np.pi / 180 

258 rc = np.corrcoef(x1, np.cos(x2))[1, 0] 

259 rs = np.corrcoef(x1, np.sin(x2))[1, 0] 

260 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0] 

261 num = (rc ** 2) + (rs ** 2) - 2 * rc * rs * rcs 

262 den = 1 - (rcs ** 2) 

263 correlation = np.sqrt(num / den) 

264 if np.corrcoef(x1, x2)[1, 0] < 0: 

265 correlation *= -1 

266 return correlation 

267 

268 

269def intrabpcorr( 

270 input_filename_shear: str, input_filename_stretch: str, 

271 input_filename_stagger: str, input_filename_buckle: str, 

272 input_filename_propel: str, input_filename_opening: str, 

273 output_csv_path: str, output_jpg_path: str, 

274 properties: dict = None, **kwargs) -> int: 

275 """Create :class:`HelParCorrelation <intrabp_correlations.intrabpcorr.IntraBasePairCorrelation>` class and 

276 execute the :meth:`launch() <intrabp_correlations.intrabpcorr.IntraBasePairCorrelation.launch>` method.""" 

277 

278 return IntraBasePairCorrelation( 

279 input_filename_shear=input_filename_shear, 

280 input_filename_stretch=input_filename_stretch, 

281 input_filename_stagger=input_filename_stagger, 

282 input_filename_buckle=input_filename_buckle, 

283 input_filename_propel=input_filename_propel, 

284 input_filename_opening=input_filename_opening, 

285 output_csv_path=output_csv_path, 

286 output_jpg_path=output_jpg_path, 

287 properties=properties, **kwargs).launch() 

288 

289 

290def main(): 

291 """Command line execution of this building block. Please check the command line documentation.""" 

292 parser = argparse.ArgumentParser(description='Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.', 

293 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

294 parser.add_argument('--config', required=False, help='Configuration file') 

295 

296 required_args = parser.add_argument_group('required arguments') 

297 required_args.add_argument('--input_filename_shear', required=True, 

298 help='Path to ser file for helical parameter shear. Accepted formats: ser.') 

299 required_args.add_argument('--input_filename_stretch', required=True, 

300 help='Path to ser file for helical parameter stretch. Accepted formats: ser.') 

301 required_args.add_argument('--input_filename_stagger', required=True, 

302 help='Path to ser file for helical parameter stagger. Accepted formats: ser.') 

303 required_args.add_argument('--input_filename_buckle', required=True, 

304 help='Path to ser file for helical parameter buckle. Accepted formats: ser.') 

305 required_args.add_argument('--input_filename_propel', required=True, 

306 help='Path to ser file for helical parameter propel. Accepted formats: ser.') 

307 required_args.add_argument('--input_filename_opening', required=True, 

308 help='Path to ser file for helical parameter opening. Accepted formats: ser.') 

309 required_args.add_argument('--output_csv_path', required=True, 

310 help='Path to output file. Accepted formats: csv.') 

311 required_args.add_argument('--output_jpg_path', required=True, 

312 help='Path to output plot. Accepted formats: jpg.') 

313 

314 args = parser.parse_args() 

315 args.config = args.config or "{}" 

316 properties = settings.ConfReader(config=args.config).get_prop_dic() 

317 

318 intrabpcorr( 

319 input_filename_shear=args.input_filename_shear, 

320 input_filename_stretch=args.input_filename_stretch, 

321 input_filename_stagger=args.input_filename_stagger, 

322 input_filename_buckle=args.input_filename_buckle, 

323 input_filename_propel=args.input_filename_propel, 

324 input_filename_opening=args.input_filename_opening, 

325 output_csv_path=args.output_csv_path, 

326 output_jpg_path=args.output_jpg_path, 

327 properties=properties) 

328 

329 

330if __name__ == '__main__': 

331 main()