Coverage for biobb_dna/interbp_correlations/interhpcorr.py: 81%

135 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:06 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the InterHelParCorrelation class and the command line interface.""" 

4import argparse 

5 

6import pandas as pd 

7import numpy as np 

8import matplotlib.pyplot as plt 

9 

10from biobb_common.generic.biobb_object import BiobbObject 

11from biobb_common.configuration import settings 

12from biobb_common.tools import file_utils as fu 

13from biobb_common.tools.file_utils import launchlogger 

14from biobb_dna.utils.loader import load_data 

15 

16 

17class InterHelParCorrelation(BiobbObject): 

18 """ 

19 | biobb_dna InterHelParCorrelation 

20 | Calculate correlation between helical parameters for a single inter-base pair. 

21 

22 Args: 

23 input_filename_shift (str): Path to .csv file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_shift_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

24 input_filename_slide (str): Path to .csv file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_slide_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

25 input_filename_rise (str): Path to .csv file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_rise_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

26 input_filename_tilt (str): Path to .csv file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_tilt_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

27 input_filename_roll (str): Path to .csv file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_roll_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

28 input_filename_twist (str): Path to .csv file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_twist_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

29 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_hpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752). 

30 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_hpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579). 

31 properties (dict): 

32 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

33 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

34 * **basepair** (*str*) - (None) Name of basepair analyzed. 

35 

36 Examples: 

37 This is a use example of how to use the building block from Python:: 

38 

39 from biobb_dna.interbp_correlations.interhpcorr import interhpcorr 

40 

41 prop = { 

42 'basepair': 'AA', 

43 } 

44 interhpcorr( 

45 input_filename_shift='path/to/shift.csv', 

46 input_filename_slide='path/to/slide.csv', 

47 input_filename_rise='path/to/rise.csv', 

48 input_filename_tilt='path/to/tilt.csv', 

49 input_filename_roll='path/to/roll.csv', 

50 input_filename_twist='path/to/twist.csv', 

51 output_csv_path='path/to/output/file.csv', 

52 output_jpg_path='path/to/output/file.jpg', 

53 properties=prop) 

54 Info: 

55 * wrapped_software: 

56 * name: In house 

57 * license: Apache-2.0 

58 * ontology: 

59 * name: EDAM 

60 * schema: http://edamontology.org/EDAM.owl 

61 

62 """ 

63 

64 def __init__( 

65 self, input_filename_shift, input_filename_slide, 

66 input_filename_rise, input_filename_tilt, 

67 input_filename_roll, input_filename_twist, 

68 output_csv_path, output_jpg_path, 

69 properties=None, **kwargs) -> None: 

70 properties = properties or {} 

71 

72 # Call parent class constructor 

73 super().__init__(properties) 

74 self.locals_var_dict = locals().copy() 

75 

76 # Input/Output files 

77 self.io_dict = { 

78 'in': { 

79 'input_filename_shift': input_filename_shift, 

80 'input_filename_slide': input_filename_slide, 

81 'input_filename_rise': input_filename_rise, 

82 'input_filename_tilt': input_filename_tilt, 

83 'input_filename_roll': input_filename_roll, 

84 'input_filename_twist': input_filename_twist 

85 }, 

86 'out': { 

87 'output_csv_path': output_csv_path, 

88 'output_jpg_path': output_jpg_path 

89 } 

90 } 

91 

92 self.properties = properties 

93 self.basepair = properties.get("basepair", None) 

94 

95 # Check the properties 

96 self.check_properties(properties) 

97 self.check_arguments() 

98 

99 @launchlogger 

100 def launch(self) -> int: 

101 """Execute the :class:`InterHelParCorrelation <interbp_correlations.interhpcorr.InterHelParCorrelation>` object.""" 

102 

103 # Setup Biobb 

104 if self.check_restart(): 

105 return 0 

106 self.stage_files() 

107 

108 # Creating temporary folder 

109 self.tmp_folder = fu.create_unique_dir(prefix="hpcorrelation_") 

110 fu.log('Creating %s temporary folder' % self.tmp_folder, self.out_log) 

111 

112 # read input 

113 shift = load_data(self.io_dict["in"]["input_filename_shift"]) 

114 slide = load_data(self.io_dict["in"]["input_filename_slide"]) 

115 rise = load_data(self.io_dict["in"]["input_filename_rise"]) 

116 tilt = load_data(self.io_dict["in"]["input_filename_tilt"]) 

117 roll = load_data(self.io_dict["in"]["input_filename_roll"]) 

118 twist = load_data(self.io_dict["in"]["input_filename_twist"]) 

119 

120 # get basepair 

121 if self.basepair is None: 

122 self.basepair = shift.columns[0] 

123 

124 # make matrix 

125 coordinates = ["shift", "slide", "rise", "tilt", "roll", "twist"] 

126 corr_matrix = pd.DataFrame( 

127 np.eye(6, 6), index=coordinates, columns=coordinates) 

128 

129 # shift 

130 corr_matrix["shift"]["slide"] = shift.corrwith(slide, method="pearson") 

131 corr_matrix["shift"]["rise"] = shift.corrwith(rise, method="pearson") 

132 corr_matrix["shift"]["tilt"] = shift.corrwith( 

133 tilt, method=self.circlineal) 

134 corr_matrix["shift"]["roll"] = shift.corrwith( 

135 roll, method=self.circlineal) 

136 corr_matrix["shift"]["twist"] = shift.corrwith( 

137 twist, method=self.circlineal) 

138 # symmetric values 

139 corr_matrix["slide"]["shift"] = corr_matrix["shift"]["slide"] 

140 corr_matrix["rise"]["shift"] = corr_matrix["shift"]["rise"] 

141 corr_matrix["tilt"]["shift"] = corr_matrix["shift"]["tilt"] 

142 corr_matrix["roll"]["shift"] = corr_matrix["shift"]["roll"] 

143 corr_matrix["twist"]["shift"] = corr_matrix["shift"]["twist"] 

144 

145 # slide 

146 corr_matrix["slide"]["rise"] = slide.corrwith(rise, method="pearson") 

147 corr_matrix["slide"]["tilt"] = slide.corrwith( 

148 tilt, method=self.circlineal) 

149 corr_matrix["slide"]["roll"] = slide.corrwith( 

150 roll, method=self.circlineal) 

151 corr_matrix["slide"]["twist"] = slide.corrwith( 

152 twist, method=self.circlineal) 

153 # symmetric values 

154 corr_matrix["rise"]["slide"] = corr_matrix["slide"]["rise"] 

155 corr_matrix["tilt"]["slide"] = corr_matrix["slide"]["tilt"] 

156 corr_matrix["roll"]["slide"] = corr_matrix["slide"]["roll"] 

157 corr_matrix["twist"]["slide"] = corr_matrix["slide"]["twist"] 

158 

159 # rise 

160 corr_matrix["rise"]["tilt"] = rise.corrwith( 

161 tilt, method=self.circlineal) 

162 corr_matrix["rise"]["roll"] = rise.corrwith( 

163 roll, method=self.circlineal) 

164 corr_matrix["rise"]["twist"] = rise.corrwith( 

165 twist, method=self.circlineal) 

166 # symmetric values 

167 corr_matrix["tilt"]["rise"] = corr_matrix["rise"]["tilt"] 

168 corr_matrix["roll"]["rise"] = corr_matrix["rise"]["roll"] 

169 corr_matrix["twist"]["rise"] = corr_matrix["rise"]["twist"] 

170 

171 # tilt 

172 corr_matrix["tilt"]["roll"] = tilt.corrwith(roll, method=self.circular) 

173 corr_matrix["tilt"]["twist"] = tilt.corrwith( 

174 twist, method=self.circular) 

175 # symmetric values 

176 corr_matrix["roll"]["tilt"] = corr_matrix["tilt"]["roll"] 

177 corr_matrix["twist"]["tilt"] = corr_matrix["tilt"]["twist"] 

178 

179 # roll 

180 corr_matrix["roll"]["twist"] = roll.corrwith( 

181 twist, method=self.circular) 

182 # symmetric values 

183 corr_matrix["twist"]["roll"] = corr_matrix["roll"]["twist"] 

184 

185 # save csv data 

186 corr_matrix.to_csv(self.io_dict["out"]["output_csv_path"]) 

187 

188 # create heatmap 

189 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

190 axs.pcolor(corr_matrix) 

191 # Loop over data dimensions and create text annotations. 

192 for i in range(len(corr_matrix)): 

193 for j in range(len(corr_matrix)): 

194 axs.text( 

195 j+.5, 

196 i+.5, 

197 f"{corr_matrix[coordinates[j]].loc[coordinates[i]]:.2f}", 

198 ha="center", 

199 va="center", 

200 color="w") 

201 axs.set_xticks([i + 0.5 for i in range(len(corr_matrix))]) 

202 axs.set_xticklabels(corr_matrix.columns, rotation=90) 

203 axs.set_yticks([i+0.5 for i in range(len(corr_matrix))]) 

204 axs.set_yticklabels(corr_matrix.index) 

205 axs.set_title( 

206 "Helical Parameter Correlation " 

207 f"for Base Pair Step \'{self.basepair}\'") 

208 fig.tight_layout() 

209 fig.savefig( 

210 self.io_dict['out']['output_jpg_path'], 

211 format="jpg") 

212 plt.close() 

213 

214 # Remove temporary file(s) 

215 self.tmp_files.extend([ 

216 self.stage_io_dict.get("unique_dir"), 

217 self.tmp_folder 

218 ]) 

219 self.remove_tmp_files() 

220 

221 self.check_arguments(output_files_created=True, raise_exception=False) 

222 

223 return 0 

224 

225 def get_corr_method(self, corrtype1, corrtype2): 

226 if corrtype1 == "circular" and corrtype2 == "linear": 

227 method = self.circlineal 

228 if corrtype1 == "linear" and corrtype2 == "circular": 

229 method = self.circlineal 

230 elif corrtype1 == "circular" and corrtype2 == "circular": 

231 method = self.circular 

232 else: 

233 method = "pearson" 

234 return method 

235 

236 @staticmethod 

237 def circular(x1, x2): 

238 x1 = x1 * np.pi / 180 

239 x2 = x2 * np.pi / 180 

240 diff_1 = np.sin(x1 - x1.mean()) 

241 diff_2 = np.sin(x2 - x2.mean()) 

242 num = (diff_1 * diff_2).sum() 

243 den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum()) 

244 return num / den 

245 

246 @staticmethod 

247 def circlineal(x1, x2): 

248 x2 = x2 * np.pi / 180 

249 rc = np.corrcoef(x1, np.cos(x2))[1, 0] 

250 rs = np.corrcoef(x1, np.sin(x2))[1, 0] 

251 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0] 

252 num = (rc ** 2) + (rs ** 2) - 2 * rc * rs * rcs 

253 den = 1 - (rcs ** 2) 

254 correlation = np.sqrt(num / den) 

255 if np.corrcoef(x1, x2)[1, 0] < 0: 

256 correlation *= -1 

257 return correlation 

258 

259 

260def interhpcorr( 

261 input_filename_shift: str, input_filename_slide: str, 

262 input_filename_rise: str, input_filename_tilt: str, 

263 input_filename_roll: str, input_filename_twist: str, 

264 output_csv_path: str, output_jpg_path: str, 

265 properties: dict = None, **kwargs) -> int: 

266 """Create :class:`InterHelParCorrelation <interbp_correlations.interhpcorr.InterHelParCorrelation>` class and 

267 execute the :meth:`launch() <interbp_correlations.interhpcorr.InterHelParCorrelation.launch>` method.""" 

268 

269 return InterHelParCorrelation( 

270 input_filename_shift=input_filename_shift, 

271 input_filename_slide=input_filename_slide, 

272 input_filename_rise=input_filename_rise, 

273 input_filename_tilt=input_filename_tilt, 

274 input_filename_roll=input_filename_roll, 

275 input_filename_twist=input_filename_twist, 

276 output_csv_path=output_csv_path, 

277 output_jpg_path=output_jpg_path, 

278 properties=properties, **kwargs).launch() 

279 

280 

281def main(): 

282 """Command line execution of this building block. Please check the command line documentation.""" 

283 parser = argparse.ArgumentParser(description='Load helical parameter file and save base data individually.', 

284 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

285 parser.add_argument('--config', required=False, help='Configuration file') 

286 

287 required_args = parser.add_argument_group('required arguments') 

288 required_args.add_argument('--input_filename_shift', required=True, 

289 help='Path to csv file with inputs. Accepted formats: csv.') 

290 required_args.add_argument('--input_filename_slide', required=True, 

291 help='Path to csv file with inputs. Accepted formats: csv.') 

292 required_args.add_argument('--input_filename_rise', required=True, 

293 help='Path to csv file with inputs. Accepted formats: csv.') 

294 required_args.add_argument('--input_filename_tilt', required=True, 

295 help='Path to csv file with inputs. Accepted formats: csv.') 

296 required_args.add_argument('--input_filename_roll', required=True, 

297 help='Path to csv file with inputs. Accepted formats: csv.') 

298 required_args.add_argument('--input_filename_twist', required=True, 

299 help='Path to csv file with inputs. Accepted formats: csv.') 

300 required_args.add_argument('--output_csv_path', required=True, 

301 help='Path to output file. Accepted formats: csv.') 

302 required_args.add_argument('--output_jpg_path', required=True, 

303 help='Path to output file. Accepted formats: csv.') 

304 

305 args = parser.parse_args() 

306 args.config = args.config or "{}" 

307 properties = settings.ConfReader(config=args.config).get_prop_dic() 

308 

309 interhpcorr( 

310 input_filename_shift=args.input_filename_shift, 

311 input_filename_slide=args.input_filename_slide, 

312 input_filename_rise=args.input_filename_rise, 

313 input_filename_tilt=args.input_filename_tilt, 

314 input_filename_roll=args.input_filename_roll, 

315 input_filename_twist=args.input_filename_twist, 

316 output_csv_path=args.output_csv_path, 

317 output_jpg_path=args.output_jpg_path, 

318 properties=properties) 

319 

320 

321if __name__ == '__main__': 

322 main()