Coverage for biobb_dna / interbp_correlations / interbpcorr.py: 94%
127 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-15 18:49 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-15 18:49 +0000
1#!/usr/bin/env python3
3"""Module containing the InterBasePairCorrelation class and the command line interface."""
4from itertools import product
5from typing import Optional
7import matplotlib as mpl
8import matplotlib.pyplot as plt
9import numpy as np
10import pandas as pd
11from biobb_common.generic.biobb_object import BiobbObject
12from biobb_common.tools.file_utils import launchlogger
14from biobb_dna.utils import constants
15from biobb_dna.utils.common import _from_string_to_list
16from biobb_dna.utils.loader import read_series
19class InterBasePairCorrelation(BiobbObject):
20 """
21 | biobb_dna InterBasePairCorrelation
22 | Calculate correlation between all base pairs of a single sequence and for a single helical parameter.
23 | Calculate correlation between neighboring base pairs and pairs of helical parameters.
25 Args:
26 input_filename_shift (str): Path to .ser file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330).
27 input_filename_slide (str): Path to .ser file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_slide.ser>`_. Accepted formats: ser (edam:format_2330).
28 input_filename_rise (str): Path to .ser file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_rise.ser>`_. Accepted formats: ser (edam:format_2330).
29 input_filename_tilt (str): Path to .ser file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_tilt.ser>`_. Accepted formats: ser (edam:format_2330).
30 input_filename_roll (str): Path to .ser file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_roll.ser>`_. Accepted formats: ser (edam:format_2330).
31 input_filename_twist (str): Path to .ser file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_twist.ser>`_. Accepted formats: ser (edam:format_2330).
32 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752).
33 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579).
34 properties (dict):
35 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option).
36 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence.
37 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
38 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
39 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
41 Examples:
42 This is a use example of how to use the building block from Python::
44 from biobb_dna.interbp_correlations.interbpcorr import interbpcorr
46 interbpcorr(
47 input_filename_shift='path/to/input/shift.ser',
48 input_filename_slide='path/to/input/slide.ser',
49 input_filename_rise='path/to/input/slide.ser',
50 input_filename_tilt='path/to/input/tilt.ser',
51 input_filename_roll='path/to/input/roll.ser',
52 input_filename_twist='path/to/input/twist.ser',
53 output_csv_path='path/to/output/file.csv',
54 output_jpg_path='path/to/output/plot.jpg',
55 properties=prop)
56 Info:
57 * wrapped_software:
58 * name: In house
59 * license: Apache-2.0
60 * ontology:
61 * name: EDAM
62 * schema: http://edamontology.org/EDAM.owl
64 """
66 def __init__(
67 self,
68 input_filename_shift,
69 input_filename_slide,
70 input_filename_rise,
71 input_filename_tilt,
72 input_filename_roll,
73 input_filename_twist,
74 output_csv_path,
75 output_jpg_path,
76 properties=None,
77 **kwargs,
78 ) -> None:
79 properties = properties or {}
81 # Call parent class constructor
82 super().__init__(properties)
83 self.locals_var_dict = locals().copy()
85 # Input/Output files
86 self.io_dict = {
87 "in": {
88 "input_filename_shift": input_filename_shift,
89 "input_filename_slide": input_filename_slide,
90 "input_filename_rise": input_filename_rise,
91 "input_filename_tilt": input_filename_tilt,
92 "input_filename_roll": input_filename_roll,
93 "input_filename_twist": input_filename_twist,
94 },
95 "out": {
96 "output_csv_path": output_csv_path,
97 "output_jpg_path": output_jpg_path,
98 },
99 }
101 self.properties = properties
102 self.sequence = properties.get("sequence", None)
103 self.seqpos = [
104 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None))
105 ]
107 # Check the properties
108 self.check_properties(properties)
109 self.check_arguments()
111 @launchlogger
112 def launch(self) -> int:
113 """Execute the :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` object."""
115 # Setup Biobb
116 if self.check_restart():
117 return 0
118 self.stage_files()
120 # check sequence
121 if self.sequence is None or len(self.sequence) < 2:
122 raise ValueError("sequence is null or too short!")
124 # check seqpos
125 if self.seqpos:
126 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
127 raise ValueError("seqpos must be a list of at least two integers")
128 else:
129 self.seqpos = None # type: ignore
131 # read input
132 shift = read_series(
133 self.stage_io_dict["in"]["input_filename_shift"], usecols=self.seqpos
134 )
135 slide = read_series(
136 self.stage_io_dict["in"]["input_filename_slide"], usecols=self.seqpos
137 )
138 rise = read_series(
139 self.stage_io_dict["in"]["input_filename_rise"], usecols=self.seqpos
140 )
141 tilt = read_series(
142 self.stage_io_dict["in"]["input_filename_tilt"], usecols=self.seqpos
143 )
144 roll = read_series(
145 self.stage_io_dict["in"]["input_filename_roll"], usecols=self.seqpos
146 )
147 twist = read_series(
148 self.stage_io_dict["in"]["input_filename_twist"], usecols=self.seqpos
149 )
151 if not self.seqpos:
152 # drop first and last columns
153 shift = shift[shift.columns[1:-2]]
154 slide = slide[slide.columns[1:-2]]
155 rise = rise[rise.columns[1:-2]]
156 tilt = tilt[tilt.columns[1:-2]]
157 roll = roll[roll.columns[1:-2]]
158 twist = twist[twist.columns[1:-2]]
159 labels = [
160 f"{i+1}_{self.sequence[i:i+2]}"
161 for i in range(1, len(shift.columns) + 1)
162 ]
163 corr_index = [
164 f"{self.sequence[i:i+3]}" for i in range(1, len(shift.columns) + 1)
165 ]
166 else:
167 labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos]
168 corr_index = [f"{self.sequence[i:i+3]}" for i in self.seqpos]
170 # rename duplicated subunits
171 shift.columns = labels
172 slide.columns = labels
173 rise.columns = labels
174 tilt.columns = labels
175 roll.columns = labels
176 twist.columns = labels
178 # set names to each dataset
179 shift.name = "shift"
180 slide.name = "slide"
181 rise.name = "rise"
182 tilt.name = "tilt"
183 roll.name = "roll"
184 twist.name = "twist"
186 # get correlation between neighboring basepairs among all helical parameters
187 results = {}
188 datasets = [shift, slide, rise, tilt, roll, twist]
189 for ser1, ser2 in product(datasets, datasets):
190 ser2_shifted = ser2.shift(axis=1)
191 ser2_shifted[labels[0]] = ser2[labels[-1]]
192 if ser1.name in constants.hp_angular and ser2.name in constants.hp_angular:
193 method = self.circular
194 elif (
195 ser1.name in constants.hp_angular and ser2.name not in constants.hp_angular
196 ) or (
197 ser2.name in constants.hp_angular and ser1.name not in constants.hp_angular
198 ):
199 method = self.circlineal
200 else:
201 method = "pearson" # type: ignore
202 corr_data = ser1.corrwith(ser2_shifted, method=method)
203 corr_data.index = corr_index
204 results[f"{ser1.name}/{ser2.name}"] = corr_data
205 result_df = pd.DataFrame.from_dict(results)
206 result_df.index = corr_index # type: ignore
208 # save csv data
209 result_df.to_csv(self.stage_io_dict["out"]["output_csv_path"])
211 # create heatmap
212 cmap = plt.get_cmap("bwr").copy()
213 bounds = [-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1]
214 num = cmap.N
215 norm = mpl.colors.BoundaryNorm(bounds, num) # type: ignore
216 cmap.set_bad(color="gainsboro")
217 fig, ax = plt.subplots(1, 1, dpi=300, figsize=(7.5, 5), tight_layout=True)
218 im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect="auto")
219 plt.colorbar(im, ticks=[-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1])
221 # axes
222 xlocs = np.arange(len(result_df.columns))
223 _ = ax.set_xticks(xlocs)
224 _ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90)
226 ylocs = np.arange(len(result_df.index))
227 _ = ax.set_yticks(ylocs)
228 _ = ax.set_yticklabels(result_df.index.to_list()) # type: ignore
230 ax.set_title(
231 "Correlation for neighboring basepairs " "and pairs of helical parameters"
232 )
234 fig.tight_layout()
235 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg")
236 plt.close()
238 # Copy files to host
239 self.copy_to_host()
241 # Remove temporary file(s)
242 self.remove_tmp_files()
244 self.check_arguments(output_files_created=True, raise_exception=False)
246 return 0
248 @staticmethod
249 def circular(x1, x2):
250 x1 = x1 * np.pi / 180
251 x2 = x2 * np.pi / 180
252 diff_1 = np.sin(x1 - x1.mean())
253 diff_2 = np.sin(x2 - x2.mean())
254 num = (diff_1 * diff_2).sum()
255 den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum())
256 return num / den
258 @staticmethod
259 def circlineal(x1, x2):
260 x2 = x2 * np.pi / 180
261 rc = np.corrcoef(x1, np.cos(x2))[1, 0]
262 rs = np.corrcoef(x1, np.sin(x2))[1, 0]
263 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0]
264 num = (rc**2) + (rs**2) - 2 * rc * rs * rcs
265 den = 1 - (rcs**2)
266 correlation = np.sqrt(num / den)
267 if np.corrcoef(x1, x2)[1, 0] < 0:
268 correlation *= -1
269 return correlation
272def interbpcorr(
273 input_filename_shift: str,
274 input_filename_slide: str,
275 input_filename_rise: str,
276 input_filename_tilt: str,
277 input_filename_roll: str,
278 input_filename_twist: str,
279 output_csv_path: str,
280 output_jpg_path: str,
281 properties: Optional[dict] = None,
282 **kwargs,
283) -> int:
284 """Create :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` class and
285 execute the :meth:`launch() <correlations.interbpcorr.InterBasePairCorrelation.launch>` method."""
286 return InterBasePairCorrelation(**dict(locals())).launch()
289interbpcorr.__doc__ = InterBasePairCorrelation.__doc__
290main = InterBasePairCorrelation.get_main(interbpcorr, "Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.")
292if __name__ == '__main__':
293 main()