Coverage for biobb_dna/interbp_correlations/interhpcorr.py: 80%
133 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 10:36 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 10:36 +0000
1#!/usr/bin/env python3
3"""Module containing the InterHelParCorrelation class and the command line interface."""
4import argparse
5from typing import Optional
7import pandas as pd
8import numpy as np
9import matplotlib.pyplot as plt
11from biobb_common.generic.biobb_object import BiobbObject
12from biobb_common.configuration import settings
13from biobb_common.tools.file_utils import launchlogger
14from biobb_dna.utils.loader import load_data
17class InterHelParCorrelation(BiobbObject):
18 """
19 | biobb_dna InterHelParCorrelation
20 | Calculate correlation between helical parameters for a single inter-base pair.
21 | Calculate correlation between helical parameters for a single inter-base pair.
23 Args:
24 input_filename_shift (str): Path to .csv file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_shift_AA.csv>`_. Accepted formats: csv (edam:format_3752).
25 input_filename_slide (str): Path to .csv file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_slide_AA.csv>`_. Accepted formats: csv (edam:format_3752).
26 input_filename_rise (str): Path to .csv file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_rise_AA.csv>`_. Accepted formats: csv (edam:format_3752).
27 input_filename_tilt (str): Path to .csv file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_tilt_AA.csv>`_. Accepted formats: csv (edam:format_3752).
28 input_filename_roll (str): Path to .csv file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_roll_AA.csv>`_. Accepted formats: csv (edam:format_3752).
29 input_filename_twist (str): Path to .csv file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/stiffness/series_twist_AA.csv>`_. Accepted formats: csv (edam:format_3752).
30 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_hpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752).
31 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_hpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579).
32 properties (dict):
33 * **basepair** (*str*) - (None) Name of basepair analyzed.
34 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
35 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
36 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
38 Examples:
39 This is a use example of how to use the building block from Python::
41 from biobb_dna.interbp_correlations.interhpcorr import interhpcorr
43 prop = {
44 'basepair': 'AA',
45 }
46 interhpcorr(
47 input_filename_shift='path/to/shift.csv',
48 input_filename_slide='path/to/slide.csv',
49 input_filename_rise='path/to/rise.csv',
50 input_filename_tilt='path/to/tilt.csv',
51 input_filename_roll='path/to/roll.csv',
52 input_filename_twist='path/to/twist.csv',
53 output_csv_path='path/to/output/file.csv',
54 output_jpg_path='path/to/output/file.jpg',
55 properties=prop)
56 Info:
57 * wrapped_software:
58 * name: In house
59 * license: Apache-2.0
60 * ontology:
61 * name: EDAM
62 * schema: http://edamontology.org/EDAM.owl
64 """
66 def __init__(
67 self, input_filename_shift, input_filename_slide,
68 input_filename_rise, input_filename_tilt,
69 input_filename_roll, input_filename_twist,
70 output_csv_path, output_jpg_path,
71 properties=None, **kwargs) -> None:
72 properties = properties or {}
74 # Call parent class constructor
75 super().__init__(properties)
76 self.locals_var_dict = locals().copy()
78 # Input/Output files
79 self.io_dict = {
80 'in': {
81 'input_filename_shift': input_filename_shift,
82 'input_filename_slide': input_filename_slide,
83 'input_filename_rise': input_filename_rise,
84 'input_filename_tilt': input_filename_tilt,
85 'input_filename_roll': input_filename_roll,
86 'input_filename_twist': input_filename_twist
87 },
88 'out': {
89 'output_csv_path': output_csv_path,
90 'output_jpg_path': output_jpg_path
91 }
92 }
94 self.properties = properties
95 self.basepair = properties.get("basepair", None)
97 # Check the properties
98 self.check_properties(properties)
99 self.check_arguments()
101 @launchlogger
102 def launch(self) -> int:
103 """Execute the :class:`InterHelParCorrelation <interbp_correlations.interhpcorr.InterHelParCorrelation>` object."""
105 # Setup Biobb
106 if self.check_restart():
107 return 0
108 self.stage_files()
110 # read input
111 shift = load_data(self.stage_io_dict["in"]["input_filename_shift"])
112 slide = load_data(self.stage_io_dict["in"]["input_filename_slide"])
113 rise = load_data(self.stage_io_dict["in"]["input_filename_rise"])
114 tilt = load_data(self.stage_io_dict["in"]["input_filename_tilt"])
115 roll = load_data(self.stage_io_dict["in"]["input_filename_roll"])
116 twist = load_data(self.stage_io_dict["in"]["input_filename_twist"])
118 # get basepair
119 if self.basepair is None:
120 self.basepair = shift.columns[0]
122 # make matrix
123 coordinates = ["shift", "slide", "rise", "tilt", "roll", "twist"]
124 corr_matrix = pd.DataFrame(
125 np.eye(6, 6), index=coordinates, columns=coordinates)
127 # shift
128 # corr_matrix["shift"]["slide"] = shift.corrwith(slide, method="pearson")
129 corr_matrix.loc["slide", "shift"] = shift.corrwith(slide, method="pearson").values[0]
130 # corr_matrix["shift"]["rise"] = shift.corrwith(rise, method="pearson")
131 corr_matrix.loc["rise", "shift"] = shift.corrwith(rise, method="pearson").values[0]
132 # corr_matrix["shift"]["tilt"] = shift.corrwith(tilt, method=self.circlineal)
133 corr_matrix.loc["tilt", "shift"] = shift.corrwith(tilt, method=self.circlineal).values[0] # type: ignore
134 # corr_matrix["shift"]["roll"] = shift.corrwith(roll, method=self.circlineal)
135 corr_matrix.loc["roll", "shift"] = shift.corrwith(roll, method=self.circlineal).values[0] # type: ignore
136 # corr_matrix["shift"]["twist"] = shift.corrwith(twist, method=self.circlineal)
137 corr_matrix.loc["twist", "shift"] = shift.corrwith(twist, method=self.circlineal).values[0] # type: ignore
138 # symmetric values
139 # corr_matrix["slide"]["shift"] = corr_matrix["shift"]["slide"]
140 corr_matrix.loc["shift", "slide"] = corr_matrix.loc["slide", "shift"]
141 # corr_matrix["rise"]["shift"] = corr_matrix["shift"]["rise"]
142 corr_matrix.loc["shift", "rise"] = corr_matrix.loc["rise", "shift"]
143 # corr_matrix["tilt"]["shift"] = corr_matrix["shift"]["tilt"]
144 corr_matrix.loc["shift", "tilt"] = corr_matrix.loc["tilt", "shift"]
145 # corr_matrix["roll"]["shift"] = corr_matrix["shift"]["roll"]
146 corr_matrix.loc["shift", "roll"] = corr_matrix.loc["roll", "shift"]
147 # corr_matrix["twist"]["shift"] = corr_matrix["shift"]["twist"]
148 corr_matrix.loc["shift", "twist"] = corr_matrix.loc["twist", "shift"]
150 # slide
151 # corr_matrix["slide"]["rise"] = slide.corrwith(rise, method="pearson")
152 corr_matrix.loc["rise", "slide"] = slide.corrwith(rise, method="pearson").values[0]
153 # corr_matrix["slide"]["tilt"] = slide.corrwith(tilt, method=self.circlineal)
154 corr_matrix.loc["tilt", "slide"] = slide.corrwith(tilt, method=self.circlineal).values[0] # type: ignore
155 # corr_matrix["slide"]["roll"] = slide.corrwith(roll, method=self.circlineal)
156 corr_matrix.loc["roll", "slide"] = slide.corrwith(roll, method=self.circlineal).values[0] # type: ignore
157 # corr_matrix["slide"]["twist"] = slide.corrwith(twist, method=self.circlineal)
158 corr_matrix.loc["twist", "slide"] = slide.corrwith(twist, method=self.circlineal).values[0] # type: ignore
159 # symmetric values
160 # corr_matrix["rise"]["slide"] = corr_matrix["slide"]["rise"]
161 corr_matrix.loc["slide", "rise"] = corr_matrix.loc["rise", "slide"]
162 # corr_matrix["tilt"]["slide"] = corr_matrix["slide"]["tilt"]
163 corr_matrix.loc["slide", "tilt"] = corr_matrix.loc["tilt", "slide"]
164 # corr_matrix["roll"]["slide"] = corr_matrix["slide"]["roll"]
165 corr_matrix.loc["slide", "roll"] = corr_matrix.loc["roll", "slide"]
166 # corr_matrix["twist"]["slide"] = corr_matrix["slide"]["twist"]
167 corr_matrix.loc["slide", "twist"] = corr_matrix.loc["twist", "slide"]
169 # rise
170 # corr_matrix["rise"]["tilt"] = rise.corrwith(tilt, method=self.circlineal)
171 corr_matrix.loc["tilt", "rise"] = rise.corrwith(tilt, method=self.circlineal).values[0] # type: ignore
172 # corr_matrix["rise"]["roll"] = rise.corrwith(roll, method=self.circlineal)
173 corr_matrix.loc["roll", "rise"] = rise.corrwith(roll, method=self.circlineal).values[0] # type: ignore
174 # corr_matrix["rise"]["twist"] = rise.corrwith(twist, method=self.circlineal)
175 corr_matrix.loc["twist", "rise"] = rise.corrwith(twist, method=self.circlineal).values[0] # type: ignore
176 # symmetric values
177 # corr_matrix["tilt"]["rise"] = corr_matrix["rise"]["tilt"]
178 corr_matrix.loc["rise", "tilt"] = corr_matrix.loc["tilt", "rise"]
179 # corr_matrix["roll"]["rise"] = corr_matrix["rise"]["roll"]
180 corr_matrix.loc["rise", "roll"] = corr_matrix.loc["roll", "rise"]
181 # corr_matrix["twist"]["rise"] = corr_matrix["rise"]["twist"]
182 corr_matrix.loc["rise", "twist"] = corr_matrix.loc["twist", "rise"]
184 # tilt
185 # corr_matrix["tilt"]["roll"] = tilt.corrwith(roll, method=self.circular)
186 corr_matrix.loc["roll", "tilt"] = tilt.corrwith(roll, method=self.circular).values[0] # type: ignore
187 # corr_matrix["tilt"]["twist"] = tilt.corrwith(twist, method=self.circular)
188 corr_matrix.loc["twist", "tilt"] = tilt.corrwith(twist, method=self.circular).values[0] # type: ignore
189 # symmetric values
190 # corr_matrix["roll"]["tilt"] = corr_matrix["tilt"]["roll"]
191 corr_matrix.loc["tilt", "roll"] = corr_matrix.loc["roll", "tilt"]
192 # corr_matrix["twist"]["tilt"] = corr_matrix["tilt"]["twist"]
193 corr_matrix.loc["tilt", "twist"] = corr_matrix.loc["twist", "tilt"]
195 # roll
196 # corr_matrix["roll"]["twist"] = roll.corrwith(twist, method=self.circular)
197 corr_matrix.loc["twist", "roll"] = roll.corrwith(twist, method=self.circular).values[0] # type: ignore
198 # symmetric values
199 # corr_matrix["twist"]["roll"] = corr_matrix["roll"]["twist"]
200 corr_matrix.loc["roll", "twist"] = corr_matrix.loc["twist", "roll"]
202 # save csv data
203 corr_matrix.to_csv(self.stage_io_dict["out"]["output_csv_path"])
205 # create heatmap
206 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True)
207 axs.pcolor(corr_matrix)
208 # Loop over data dimensions and create text annotations.
209 for i in range(len(corr_matrix)):
210 for j in range(len(corr_matrix)):
211 axs.text(
212 j+.5,
213 i+.5,
214 f"{corr_matrix[coordinates[j]].loc[coordinates[i]]:.2f}",
215 ha="center",
216 va="center",
217 color="w")
218 axs.set_xticks([i + 0.5 for i in range(len(corr_matrix))])
219 axs.set_xticklabels(corr_matrix.columns, rotation=90)
220 axs.set_yticks([i+0.5 for i in range(len(corr_matrix))])
221 axs.set_yticklabels(corr_matrix.index)
222 axs.set_title(
223 "Helical Parameter Correlation "
224 f"for Base Pair Step \'{self.basepair}\'")
225 fig.tight_layout()
226 fig.savefig(
227 self.stage_io_dict['out']['output_jpg_path'],
228 format="jpg")
229 plt.close()
231 # Copy files to host
232 self.copy_to_host()
234 # Remove temporary file(s)
235 # self.tmp_files.extend([
236 # self.stage_io_dict.get("unique_dir", ""),
237 # ])
238 self.remove_tmp_files()
240 self.check_arguments(output_files_created=True, raise_exception=False)
242 return 0
244 def get_corr_method(self, corrtype1, corrtype2):
245 if corrtype1 == "circular" and corrtype2 == "linear":
246 method = self.circlineal
247 if corrtype1 == "linear" and corrtype2 == "circular":
248 method = self.circlineal
249 elif corrtype1 == "circular" and corrtype2 == "circular":
250 method = self.circular
251 else:
252 method = "pearson"
253 return method
255 @staticmethod
256 def circular(x1, x2):
257 x1 = x1 * np.pi / 180
258 x2 = x2 * np.pi / 180
259 diff_1 = np.sin(x1 - x1.mean())
260 diff_2 = np.sin(x2 - x2.mean())
261 num = (diff_1 * diff_2).sum()
262 den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum())
263 return num / den
265 @staticmethod
266 def circlineal(x1, x2):
267 x2 = x2 * np.pi / 180
268 rc = np.corrcoef(x1, np.cos(x2))[1, 0]
269 rs = np.corrcoef(x1, np.sin(x2))[1, 0]
270 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0]
271 num = (rc ** 2) + (rs ** 2) - 2 * rc * rs * rcs
272 den = 1 - (rcs ** 2)
273 correlation = np.sqrt(num / den)
274 if np.corrcoef(x1, x2)[1, 0] < 0:
275 correlation *= -1
276 return correlation
279def interhpcorr(
280 input_filename_shift: str, input_filename_slide: str,
281 input_filename_rise: str, input_filename_tilt: str,
282 input_filename_roll: str, input_filename_twist: str,
283 output_csv_path: str, output_jpg_path: str,
284 properties: Optional[dict] = None, **kwargs) -> int:
285 """Create :class:`InterHelParCorrelation <interbp_correlations.interhpcorr.InterHelParCorrelation>` class and
286 execute the :meth:`launch() <interbp_correlations.interhpcorr.InterHelParCorrelation.launch>` method."""
288 return InterHelParCorrelation(
289 input_filename_shift=input_filename_shift,
290 input_filename_slide=input_filename_slide,
291 input_filename_rise=input_filename_rise,
292 input_filename_tilt=input_filename_tilt,
293 input_filename_roll=input_filename_roll,
294 input_filename_twist=input_filename_twist,
295 output_csv_path=output_csv_path,
296 output_jpg_path=output_jpg_path,
297 properties=properties, **kwargs).launch()
299 interhpcorr.__doc__ = InterHelParCorrelation.__doc__
302def main():
303 """Command line execution of this building block. Please check the command line documentation."""
304 parser = argparse.ArgumentParser(description='Load helical parameter file and save base data individually.',
305 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999))
306 parser.add_argument('--config', required=False, help='Configuration file')
308 required_args = parser.add_argument_group('required arguments')
309 required_args.add_argument('--input_filename_shift', required=True,
310 help='Path to csv file with inputs. Accepted formats: csv.')
311 required_args.add_argument('--input_filename_slide', required=True,
312 help='Path to csv file with inputs. Accepted formats: csv.')
313 required_args.add_argument('--input_filename_rise', required=True,
314 help='Path to csv file with inputs. Accepted formats: csv.')
315 required_args.add_argument('--input_filename_tilt', required=True,
316 help='Path to csv file with inputs. Accepted formats: csv.')
317 required_args.add_argument('--input_filename_roll', required=True,
318 help='Path to csv file with inputs. Accepted formats: csv.')
319 required_args.add_argument('--input_filename_twist', required=True,
320 help='Path to csv file with inputs. Accepted formats: csv.')
321 required_args.add_argument('--output_csv_path', required=True,
322 help='Path to output file. Accepted formats: csv.')
323 required_args.add_argument('--output_jpg_path', required=True,
324 help='Path to output file. Accepted formats: csv.')
326 args = parser.parse_args()
327 args.config = args.config or "{}"
328 properties = settings.ConfReader(config=args.config).get_prop_dic()
330 interhpcorr(
331 input_filename_shift=args.input_filename_shift,
332 input_filename_slide=args.input_filename_slide,
333 input_filename_rise=args.input_filename_rise,
334 input_filename_tilt=args.input_filename_tilt,
335 input_filename_roll=args.input_filename_roll,
336 input_filename_twist=args.input_filename_twist,
337 output_csv_path=args.output_csv_path,
338 output_jpg_path=args.output_jpg_path,
339 properties=properties)
342if __name__ == '__main__':
343 main()