Coverage for biobb_dna/interbp_correlations/interhpcorr.py: 80%

133 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 10:36 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the InterHelParCorrelation class and the command line interface.""" 

4import argparse 

5from typing import Optional 

6 

7import pandas as pd 

8import numpy as np 

9import matplotlib.pyplot as plt 

10 

11from biobb_common.generic.biobb_object import BiobbObject 

12from biobb_common.configuration import settings 

13from biobb_common.tools.file_utils import launchlogger 

14from biobb_dna.utils.loader import load_data 

15 

16 

17class InterHelParCorrelation(BiobbObject): 

18 """ 

19 | biobb_dna InterHelParCorrelation 

20 | Calculate correlation between helical parameters for a single inter-base pair. 

21 | Calculate correlation between helical parameters for a single inter-base pair. 

22 

23 Args: 

24 input_filename_shift (str): Path to .csv file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_shift_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

25 input_filename_slide (str): Path to .csv file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_slide_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

26 input_filename_rise (str): Path to .csv file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_rise_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

27 input_filename_tilt (str): Path to .csv file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_tilt_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

28 input_filename_roll (str): Path to .csv file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_roll_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

29 input_filename_twist (str): Path to .csv file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_twist_AA.csv>`_. Accepted formats: csv (edam:format_3752). 

30 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_hpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752). 

31 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_hpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579). 

32 properties (dict): 

33 * **basepair** (*str*) - (None) Name of basepair analyzed. 

34 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

35 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

36 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

37 

38 Examples: 

39 This is a use example of how to use the building block from Python:: 

40 

41 from biobb_dna.interbp_correlations.interhpcorr import interhpcorr 

42 

43 prop = { 

44 'basepair': 'AA', 

45 } 

46 interhpcorr( 

47 input_filename_shift='path/to/shift.csv', 

48 input_filename_slide='path/to/slide.csv', 

49 input_filename_rise='path/to/rise.csv', 

50 input_filename_tilt='path/to/tilt.csv', 

51 input_filename_roll='path/to/roll.csv', 

52 input_filename_twist='path/to/twist.csv', 

53 output_csv_path='path/to/output/file.csv', 

54 output_jpg_path='path/to/output/file.jpg', 

55 properties=prop) 

56 Info: 

57 * wrapped_software: 

58 * name: In house 

59 * license: Apache-2.0 

60 * ontology: 

61 * name: EDAM 

62 * schema: http://edamontology.org/EDAM.owl 

63 

64 """ 

65 

66 def __init__( 

67 self, input_filename_shift, input_filename_slide, 

68 input_filename_rise, input_filename_tilt, 

69 input_filename_roll, input_filename_twist, 

70 output_csv_path, output_jpg_path, 

71 properties=None, **kwargs) -> None: 

72 properties = properties or {} 

73 

74 # Call parent class constructor 

75 super().__init__(properties) 

76 self.locals_var_dict = locals().copy() 

77 

78 # Input/Output files 

79 self.io_dict = { 

80 'in': { 

81 'input_filename_shift': input_filename_shift, 

82 'input_filename_slide': input_filename_slide, 

83 'input_filename_rise': input_filename_rise, 

84 'input_filename_tilt': input_filename_tilt, 

85 'input_filename_roll': input_filename_roll, 

86 'input_filename_twist': input_filename_twist 

87 }, 

88 'out': { 

89 'output_csv_path': output_csv_path, 

90 'output_jpg_path': output_jpg_path 

91 } 

92 } 

93 

94 self.properties = properties 

95 self.basepair = properties.get("basepair", None) 

96 

97 # Check the properties 

98 self.check_properties(properties) 

99 self.check_arguments() 

100 

101 @launchlogger 

102 def launch(self) -> int: 

103 """Execute the :class:`InterHelParCorrelation <interbp_correlations.interhpcorr.InterHelParCorrelation>` object.""" 

104 

105 # Setup Biobb 

106 if self.check_restart(): 

107 return 0 

108 self.stage_files() 

109 

110 # read input 

111 shift = load_data(self.stage_io_dict["in"]["input_filename_shift"]) 

112 slide = load_data(self.stage_io_dict["in"]["input_filename_slide"]) 

113 rise = load_data(self.stage_io_dict["in"]["input_filename_rise"]) 

114 tilt = load_data(self.stage_io_dict["in"]["input_filename_tilt"]) 

115 roll = load_data(self.stage_io_dict["in"]["input_filename_roll"]) 

116 twist = load_data(self.stage_io_dict["in"]["input_filename_twist"]) 

117 

118 # get basepair 

119 if self.basepair is None: 

120 self.basepair = shift.columns[0] 

121 

122 # make matrix 

123 coordinates = ["shift", "slide", "rise", "tilt", "roll", "twist"] 

124 corr_matrix = pd.DataFrame( 

125 np.eye(6, 6), index=coordinates, columns=coordinates) 

126 

127 # shift 

128 # corr_matrix["shift"]["slide"] = shift.corrwith(slide, method="pearson") 

129 corr_matrix.loc["slide", "shift"] = shift.corrwith(slide, method="pearson").values[0] 

130 # corr_matrix["shift"]["rise"] = shift.corrwith(rise, method="pearson") 

131 corr_matrix.loc["rise", "shift"] = shift.corrwith(rise, method="pearson").values[0] 

132 # corr_matrix["shift"]["tilt"] = shift.corrwith(tilt, method=self.circlineal) 

133 corr_matrix.loc["tilt", "shift"] = shift.corrwith(tilt, method=self.circlineal).values[0] # type: ignore 

134 # corr_matrix["shift"]["roll"] = shift.corrwith(roll, method=self.circlineal) 

135 corr_matrix.loc["roll", "shift"] = shift.corrwith(roll, method=self.circlineal).values[0] # type: ignore 

136 # corr_matrix["shift"]["twist"] = shift.corrwith(twist, method=self.circlineal) 

137 corr_matrix.loc["twist", "shift"] = shift.corrwith(twist, method=self.circlineal).values[0] # type: ignore 

138 # symmetric values 

139 # corr_matrix["slide"]["shift"] = corr_matrix["shift"]["slide"] 

140 corr_matrix.loc["shift", "slide"] = corr_matrix.loc["slide", "shift"] 

141 # corr_matrix["rise"]["shift"] = corr_matrix["shift"]["rise"] 

142 corr_matrix.loc["shift", "rise"] = corr_matrix.loc["rise", "shift"] 

143 # corr_matrix["tilt"]["shift"] = corr_matrix["shift"]["tilt"] 

144 corr_matrix.loc["shift", "tilt"] = corr_matrix.loc["tilt", "shift"] 

145 # corr_matrix["roll"]["shift"] = corr_matrix["shift"]["roll"] 

146 corr_matrix.loc["shift", "roll"] = corr_matrix.loc["roll", "shift"] 

147 # corr_matrix["twist"]["shift"] = corr_matrix["shift"]["twist"] 

148 corr_matrix.loc["shift", "twist"] = corr_matrix.loc["twist", "shift"] 

149 

150 # slide 

151 # corr_matrix["slide"]["rise"] = slide.corrwith(rise, method="pearson") 

152 corr_matrix.loc["rise", "slide"] = slide.corrwith(rise, method="pearson").values[0] 

153 # corr_matrix["slide"]["tilt"] = slide.corrwith(tilt, method=self.circlineal) 

154 corr_matrix.loc["tilt", "slide"] = slide.corrwith(tilt, method=self.circlineal).values[0] # type: ignore 

155 # corr_matrix["slide"]["roll"] = slide.corrwith(roll, method=self.circlineal) 

156 corr_matrix.loc["roll", "slide"] = slide.corrwith(roll, method=self.circlineal).values[0] # type: ignore 

157 # corr_matrix["slide"]["twist"] = slide.corrwith(twist, method=self.circlineal) 

158 corr_matrix.loc["twist", "slide"] = slide.corrwith(twist, method=self.circlineal).values[0] # type: ignore 

159 # symmetric values 

160 # corr_matrix["rise"]["slide"] = corr_matrix["slide"]["rise"] 

161 corr_matrix.loc["slide", "rise"] = corr_matrix.loc["rise", "slide"] 

162 # corr_matrix["tilt"]["slide"] = corr_matrix["slide"]["tilt"] 

163 corr_matrix.loc["slide", "tilt"] = corr_matrix.loc["tilt", "slide"] 

164 # corr_matrix["roll"]["slide"] = corr_matrix["slide"]["roll"] 

165 corr_matrix.loc["slide", "roll"] = corr_matrix.loc["roll", "slide"] 

166 # corr_matrix["twist"]["slide"] = corr_matrix["slide"]["twist"] 

167 corr_matrix.loc["slide", "twist"] = corr_matrix.loc["twist", "slide"] 

168 

169 # rise 

170 # corr_matrix["rise"]["tilt"] = rise.corrwith(tilt, method=self.circlineal) 

171 corr_matrix.loc["tilt", "rise"] = rise.corrwith(tilt, method=self.circlineal).values[0] # type: ignore 

172 # corr_matrix["rise"]["roll"] = rise.corrwith(roll, method=self.circlineal) 

173 corr_matrix.loc["roll", "rise"] = rise.corrwith(roll, method=self.circlineal).values[0] # type: ignore 

174 # corr_matrix["rise"]["twist"] = rise.corrwith(twist, method=self.circlineal) 

175 corr_matrix.loc["twist", "rise"] = rise.corrwith(twist, method=self.circlineal).values[0] # type: ignore 

176 # symmetric values 

177 # corr_matrix["tilt"]["rise"] = corr_matrix["rise"]["tilt"] 

178 corr_matrix.loc["rise", "tilt"] = corr_matrix.loc["tilt", "rise"] 

179 # corr_matrix["roll"]["rise"] = corr_matrix["rise"]["roll"] 

180 corr_matrix.loc["rise", "roll"] = corr_matrix.loc["roll", "rise"] 

181 # corr_matrix["twist"]["rise"] = corr_matrix["rise"]["twist"] 

182 corr_matrix.loc["rise", "twist"] = corr_matrix.loc["twist", "rise"] 

183 

184 # tilt 

185 # corr_matrix["tilt"]["roll"] = tilt.corrwith(roll, method=self.circular) 

186 corr_matrix.loc["roll", "tilt"] = tilt.corrwith(roll, method=self.circular).values[0] # type: ignore 

187 # corr_matrix["tilt"]["twist"] = tilt.corrwith(twist, method=self.circular) 

188 corr_matrix.loc["twist", "tilt"] = tilt.corrwith(twist, method=self.circular).values[0] # type: ignore 

189 # symmetric values 

190 # corr_matrix["roll"]["tilt"] = corr_matrix["tilt"]["roll"] 

191 corr_matrix.loc["tilt", "roll"] = corr_matrix.loc["roll", "tilt"] 

192 # corr_matrix["twist"]["tilt"] = corr_matrix["tilt"]["twist"] 

193 corr_matrix.loc["tilt", "twist"] = corr_matrix.loc["twist", "tilt"] 

194 

195 # roll 

196 # corr_matrix["roll"]["twist"] = roll.corrwith(twist, method=self.circular) 

197 corr_matrix.loc["twist", "roll"] = roll.corrwith(twist, method=self.circular).values[0] # type: ignore 

198 # symmetric values 

199 # corr_matrix["twist"]["roll"] = corr_matrix["roll"]["twist"] 

200 corr_matrix.loc["roll", "twist"] = corr_matrix.loc["twist", "roll"] 

201 

202 # save csv data 

203 corr_matrix.to_csv(self.stage_io_dict["out"]["output_csv_path"]) 

204 

205 # create heatmap 

206 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

207 axs.pcolor(corr_matrix) 

208 # Loop over data dimensions and create text annotations. 

209 for i in range(len(corr_matrix)): 

210 for j in range(len(corr_matrix)): 

211 axs.text( 

212 j+.5, 

213 i+.5, 

214 f"{corr_matrix[coordinates[j]].loc[coordinates[i]]:.2f}", 

215 ha="center", 

216 va="center", 

217 color="w") 

218 axs.set_xticks([i + 0.5 for i in range(len(corr_matrix))]) 

219 axs.set_xticklabels(corr_matrix.columns, rotation=90) 

220 axs.set_yticks([i+0.5 for i in range(len(corr_matrix))]) 

221 axs.set_yticklabels(corr_matrix.index) 

222 axs.set_title( 

223 "Helical Parameter Correlation " 

224 f"for Base Pair Step \'{self.basepair}\'") 

225 fig.tight_layout() 

226 fig.savefig( 

227 self.stage_io_dict['out']['output_jpg_path'], 

228 format="jpg") 

229 plt.close() 

230 

231 # Copy files to host 

232 self.copy_to_host() 

233 

234 # Remove temporary file(s) 

235 # self.tmp_files.extend([ 

236 # self.stage_io_dict.get("unique_dir", ""), 

237 # ]) 

238 self.remove_tmp_files() 

239 

240 self.check_arguments(output_files_created=True, raise_exception=False) 

241 

242 return 0 

243 

244 def get_corr_method(self, corrtype1, corrtype2): 

245 if corrtype1 == "circular" and corrtype2 == "linear": 

246 method = self.circlineal 

247 if corrtype1 == "linear" and corrtype2 == "circular": 

248 method = self.circlineal 

249 elif corrtype1 == "circular" and corrtype2 == "circular": 

250 method = self.circular 

251 else: 

252 method = "pearson" 

253 return method 

254 

255 @staticmethod 

256 def circular(x1, x2): 

257 x1 = x1 * np.pi / 180 

258 x2 = x2 * np.pi / 180 

259 diff_1 = np.sin(x1 - x1.mean()) 

260 diff_2 = np.sin(x2 - x2.mean()) 

261 num = (diff_1 * diff_2).sum() 

262 den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum()) 

263 return num / den 

264 

265 @staticmethod 

266 def circlineal(x1, x2): 

267 x2 = x2 * np.pi / 180 

268 rc = np.corrcoef(x1, np.cos(x2))[1, 0] 

269 rs = np.corrcoef(x1, np.sin(x2))[1, 0] 

270 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0] 

271 num = (rc ** 2) + (rs ** 2) - 2 * rc * rs * rcs 

272 den = 1 - (rcs ** 2) 

273 correlation = np.sqrt(num / den) 

274 if np.corrcoef(x1, x2)[1, 0] < 0: 

275 correlation *= -1 

276 return correlation 

277 

278 

279def interhpcorr( 

280 input_filename_shift: str, input_filename_slide: str, 

281 input_filename_rise: str, input_filename_tilt: str, 

282 input_filename_roll: str, input_filename_twist: str, 

283 output_csv_path: str, output_jpg_path: str, 

284 properties: Optional[dict] = None, **kwargs) -> int: 

285 """Create :class:`InterHelParCorrelation <interbp_correlations.interhpcorr.InterHelParCorrelation>` class and 

286 execute the :meth:`launch() <interbp_correlations.interhpcorr.InterHelParCorrelation.launch>` method.""" 

287 

288 return InterHelParCorrelation( 

289 input_filename_shift=input_filename_shift, 

290 input_filename_slide=input_filename_slide, 

291 input_filename_rise=input_filename_rise, 

292 input_filename_tilt=input_filename_tilt, 

293 input_filename_roll=input_filename_roll, 

294 input_filename_twist=input_filename_twist, 

295 output_csv_path=output_csv_path, 

296 output_jpg_path=output_jpg_path, 

297 properties=properties, **kwargs).launch() 

298 

299 interhpcorr.__doc__ = InterHelParCorrelation.__doc__ 

300 

301 

302def main(): 

303 """Command line execution of this building block. Please check the command line documentation.""" 

304 parser = argparse.ArgumentParser(description='Load helical parameter file and save base data individually.', 

305 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

306 parser.add_argument('--config', required=False, help='Configuration file') 

307 

308 required_args = parser.add_argument_group('required arguments') 

309 required_args.add_argument('--input_filename_shift', required=True, 

310 help='Path to csv file with inputs. Accepted formats: csv.') 

311 required_args.add_argument('--input_filename_slide', required=True, 

312 help='Path to csv file with inputs. Accepted formats: csv.') 

313 required_args.add_argument('--input_filename_rise', required=True, 

314 help='Path to csv file with inputs. Accepted formats: csv.') 

315 required_args.add_argument('--input_filename_tilt', required=True, 

316 help='Path to csv file with inputs. Accepted formats: csv.') 

317 required_args.add_argument('--input_filename_roll', required=True, 

318 help='Path to csv file with inputs. Accepted formats: csv.') 

319 required_args.add_argument('--input_filename_twist', required=True, 

320 help='Path to csv file with inputs. Accepted formats: csv.') 

321 required_args.add_argument('--output_csv_path', required=True, 

322 help='Path to output file. Accepted formats: csv.') 

323 required_args.add_argument('--output_jpg_path', required=True, 

324 help='Path to output file. Accepted formats: csv.') 

325 

326 args = parser.parse_args() 

327 args.config = args.config or "{}" 

328 properties = settings.ConfReader(config=args.config).get_prop_dic() 

329 

330 interhpcorr( 

331 input_filename_shift=args.input_filename_shift, 

332 input_filename_slide=args.input_filename_slide, 

333 input_filename_rise=args.input_filename_rise, 

334 input_filename_tilt=args.input_filename_tilt, 

335 input_filename_roll=args.input_filename_roll, 

336 input_filename_twist=args.input_filename_twist, 

337 output_csv_path=args.output_csv_path, 

338 output_jpg_path=args.output_jpg_path, 

339 properties=properties) 

340 

341 

342if __name__ == '__main__': 

343 main()