Coverage for biobb_dna/interbp_correlations/interbpcorr.py: 84%
144 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 10:36 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 10:36 +0000
1#!/usr/bin/env python3
3"""Module containing the InterBasePairCorrelation class and the command line interface."""
5import argparse
6from itertools import product
7from typing import Optional
9import matplotlib as mpl
10import matplotlib.pyplot as plt
11import numpy as np
12import pandas as pd
13from biobb_common.configuration import settings
14from biobb_common.generic.biobb_object import BiobbObject
15from biobb_common.tools.file_utils import launchlogger
17from biobb_dna.utils import constants
18from biobb_dna.utils.common import _from_string_to_list
19from biobb_dna.utils.loader import read_series
22class InterBasePairCorrelation(BiobbObject):
23 """
24 | biobb_dna InterBasePairCorrelation
25 | Calculate correlation between all base pairs of a single sequence and for a single helical parameter.
26 | Calculate correlation between neighboring base pairs and pairs of helical parameters.
28 Args:
29 input_filename_shift (str): Path to .ser file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330).
30 input_filename_slide (str): Path to .ser file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_slide.ser>`_. Accepted formats: ser (edam:format_2330).
31 input_filename_rise (str): Path to .ser file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_rise.ser>`_. Accepted formats: ser (edam:format_2330).
32 input_filename_tilt (str): Path to .ser file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_tilt.ser>`_. Accepted formats: ser (edam:format_2330).
33 input_filename_roll (str): Path to .ser file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_roll.ser>`_. Accepted formats: ser (edam:format_2330).
34 input_filename_twist (str): Path to .ser file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_twist.ser>`_. Accepted formats: ser (edam:format_2330).
35 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752).
36 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579).
37 properties (dict):
38 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option).
39 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence.
40 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
41 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
42 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
44 Examples:
45 This is a use example of how to use the building block from Python::
47 from biobb_dna.interbp_correlations.interbpcorr import interbpcorr
49 interbpcorr(
50 input_filename_shift='path/to/input/shift.ser',
51 input_filename_slide='path/to/input/slide.ser',
52 input_filename_rise='path/to/input/slide.ser',
53 input_filename_tilt='path/to/input/tilt.ser',
54 input_filename_roll='path/to/input/roll.ser',
55 input_filename_twist='path/to/input/twist.ser',
56 output_csv_path='path/to/output/file.csv',
57 output_jpg_path='path/to/output/plot.jpg',
58 properties=prop)
59 Info:
60 * wrapped_software:
61 * name: In house
62 * license: Apache-2.0
63 * ontology:
64 * name: EDAM
65 * schema: http://edamontology.org/EDAM.owl
67 """
69 def __init__(
70 self,
71 input_filename_shift,
72 input_filename_slide,
73 input_filename_rise,
74 input_filename_tilt,
75 input_filename_roll,
76 input_filename_twist,
77 output_csv_path,
78 output_jpg_path,
79 properties=None,
80 **kwargs,
81 ) -> None:
82 properties = properties or {}
84 # Call parent class constructor
85 super().__init__(properties)
86 self.locals_var_dict = locals().copy()
88 # Input/Output files
89 self.io_dict = {
90 "in": {
91 "input_filename_shift": input_filename_shift,
92 "input_filename_slide": input_filename_slide,
93 "input_filename_rise": input_filename_rise,
94 "input_filename_tilt": input_filename_tilt,
95 "input_filename_roll": input_filename_roll,
96 "input_filename_twist": input_filename_twist,
97 },
98 "out": {
99 "output_csv_path": output_csv_path,
100 "output_jpg_path": output_jpg_path,
101 },
102 }
104 self.properties = properties
105 self.sequence = properties.get("sequence", None)
106 self.seqpos = [
107 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None))
108 ]
110 # Check the properties
111 self.check_properties(properties)
112 self.check_arguments()
114 @launchlogger
115 def launch(self) -> int:
116 """Execute the :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` object."""
118 # Setup Biobb
119 if self.check_restart():
120 return 0
121 self.stage_files()
123 # check sequence
124 if self.sequence is None or len(self.sequence) < 2:
125 raise ValueError("sequence is null or too short!")
127 # check seqpos
128 if self.seqpos:
129 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
130 raise ValueError("seqpos must be a list of at least two integers")
131 else:
132 self.seqpos = None # type: ignore
134 # read input
135 shift = read_series(
136 self.stage_io_dict["in"]["input_filename_shift"], usecols=self.seqpos
137 )
138 slide = read_series(
139 self.stage_io_dict["in"]["input_filename_slide"], usecols=self.seqpos
140 )
141 rise = read_series(
142 self.stage_io_dict["in"]["input_filename_rise"], usecols=self.seqpos
143 )
144 tilt = read_series(
145 self.stage_io_dict["in"]["input_filename_tilt"], usecols=self.seqpos
146 )
147 roll = read_series(
148 self.stage_io_dict["in"]["input_filename_roll"], usecols=self.seqpos
149 )
150 twist = read_series(
151 self.stage_io_dict["in"]["input_filename_twist"], usecols=self.seqpos
152 )
154 if not self.seqpos:
155 # drop first and last columns
156 shift = shift[shift.columns[1:-2]]
157 slide = slide[slide.columns[1:-2]]
158 rise = rise[rise.columns[1:-2]]
159 tilt = tilt[tilt.columns[1:-2]]
160 roll = roll[roll.columns[1:-2]]
161 twist = twist[twist.columns[1:-2]]
162 labels = [
163 f"{i+1}_{self.sequence[i:i+2]}"
164 for i in range(1, len(shift.columns) + 1)
165 ]
166 corr_index = [
167 f"{self.sequence[i:i+3]}" for i in range(1, len(shift.columns) + 1)
168 ]
169 else:
170 labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos]
171 corr_index = [f"{self.sequence[i:i+3]}" for i in self.seqpos]
173 # rename duplicated subunits
174 shift.columns = labels
175 slide.columns = labels
176 rise.columns = labels
177 tilt.columns = labels
178 roll.columns = labels
179 twist.columns = labels
181 # set names to each dataset
182 shift.name = "shift"
183 slide.name = "slide"
184 rise.name = "rise"
185 tilt.name = "tilt"
186 roll.name = "roll"
187 twist.name = "twist"
189 # get correlation between neighboring basepairs among all helical parameters
190 results = {}
191 datasets = [shift, slide, rise, tilt, roll, twist]
192 for ser1, ser2 in product(datasets, datasets):
193 ser2_shifted = ser2.shift(axis=1)
194 ser2_shifted[labels[0]] = ser2[labels[-1]]
195 if ser1.name in constants.hp_angular and ser2.name in constants.hp_angular:
196 method = self.circular
197 elif (
198 ser1.name in constants.hp_angular and ser2.name not in constants.hp_angular
199 ) or (
200 ser2.name in constants.hp_angular and ser1.name not in constants.hp_angular
201 ):
202 method = self.circlineal
203 else:
204 method = "pearson" # type: ignore
205 corr_data = ser1.corrwith(ser2_shifted, method=method)
206 corr_data.index = corr_index
207 results[f"{ser1.name}/{ser2.name}"] = corr_data
208 result_df = pd.DataFrame.from_dict(results)
209 result_df.index = corr_index # type: ignore
211 # save csv data
212 result_df.to_csv(self.stage_io_dict["out"]["output_csv_path"])
214 # create heatmap
215 cmap = plt.get_cmap("bwr").copy()
216 bounds = [-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1]
217 num = cmap.N
218 norm = mpl.colors.BoundaryNorm(bounds, num) # type: ignore
219 cmap.set_bad(color="gainsboro")
220 fig, ax = plt.subplots(1, 1, dpi=300, figsize=(7.5, 5), tight_layout=True)
221 im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect="auto")
222 plt.colorbar(im, ticks=[-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1])
224 # axes
225 xlocs = np.arange(len(result_df.columns))
226 _ = ax.set_xticks(xlocs)
227 _ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90)
229 ylocs = np.arange(len(result_df.index))
230 _ = ax.set_yticks(ylocs)
231 _ = ax.set_yticklabels(result_df.index.to_list()) # type: ignore
233 ax.set_title(
234 "Correlation for neighboring basepairs " "and pairs of helical parameters"
235 )
237 fig.tight_layout()
238 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg")
239 plt.close()
241 # Copy files to host
242 self.copy_to_host()
244 # Remove temporary file(s)
245 # self.tmp_files.extend([
246 # self.stage_io_dict.get("unique_dir", ""),
247 # ])
248 self.remove_tmp_files()
250 self.check_arguments(output_files_created=True, raise_exception=False)
252 return 0
254 @staticmethod
255 def circular(x1, x2):
256 x1 = x1 * np.pi / 180
257 x2 = x2 * np.pi / 180
258 diff_1 = np.sin(x1 - x1.mean())
259 diff_2 = np.sin(x2 - x2.mean())
260 num = (diff_1 * diff_2).sum()
261 den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum())
262 return num / den
264 @staticmethod
265 def circlineal(x1, x2):
266 x2 = x2 * np.pi / 180
267 rc = np.corrcoef(x1, np.cos(x2))[1, 0]
268 rs = np.corrcoef(x1, np.sin(x2))[1, 0]
269 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0]
270 num = (rc**2) + (rs**2) - 2 * rc * rs * rcs
271 den = 1 - (rcs**2)
272 correlation = np.sqrt(num / den)
273 if np.corrcoef(x1, x2)[1, 0] < 0:
274 correlation *= -1
275 return correlation
278def interbpcorr(
279 input_filename_shift: str,
280 input_filename_slide: str,
281 input_filename_rise: str,
282 input_filename_tilt: str,
283 input_filename_roll: str,
284 input_filename_twist: str,
285 output_csv_path: str,
286 output_jpg_path: str,
287 properties: Optional[dict] = None,
288 **kwargs,
289) -> int:
290 """Create :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` class and
291 execute the :meth:`launch() <correlations.interbpcorr.InterBasePairCorrelation.launch>` method."""
293 return InterBasePairCorrelation(
294 input_filename_shift=input_filename_shift,
295 input_filename_slide=input_filename_slide,
296 input_filename_rise=input_filename_rise,
297 input_filename_tilt=input_filename_tilt,
298 input_filename_roll=input_filename_roll,
299 input_filename_twist=input_filename_twist,
300 output_csv_path=output_csv_path,
301 output_jpg_path=output_jpg_path,
302 properties=properties,
303 **kwargs,
304 ).launch()
306 interbpcorr.__doc__ = InterBasePairCorrelation.__doc__
309def main():
310 """Command line execution of this building block. Please check the command line documentation."""
311 parser = argparse.ArgumentParser(
312 description="Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.",
313 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999),
314 )
315 parser.add_argument("--config", required=False, help="Configuration file")
317 required_args = parser.add_argument_group("required arguments")
318 required_args.add_argument(
319 "--input_filename_shift",
320 required=True,
321 help="Path to ser file for helical parameter shift. Accepted formats: ser.",
322 )
323 required_args.add_argument(
324 "--input_filename_slide",
325 required=True,
326 help="Path to ser file for helical parameter slide. Accepted formats: ser.",
327 )
328 required_args.add_argument(
329 "--input_filename_rise",
330 required=True,
331 help="Path to ser file for helical parameter rise. Accepted formats: ser.",
332 )
333 required_args.add_argument(
334 "--input_filename_tilt",
335 required=True,
336 help="Path to ser file for helical parameter tilt. Accepted formats: ser.",
337 )
338 required_args.add_argument(
339 "--input_filename_roll",
340 required=True,
341 help="Path to ser file for helical parameter roll. Accepted formats: ser.",
342 )
343 required_args.add_argument(
344 "--input_filename_twist",
345 required=True,
346 help="Path to ser file for helical parameter twist. Accepted formats: ser.",
347 )
348 required_args.add_argument(
349 "--output_csv_path",
350 required=True,
351 help="Path to output file. Accepted formats: csv.",
352 )
353 required_args.add_argument(
354 "--output_jpg_path",
355 required=True,
356 help="Path to output plot. Accepted formats: jpg.",
357 )
359 args = parser.parse_args()
360 args.config = args.config or "{}"
361 properties = settings.ConfReader(config=args.config).get_prop_dic()
363 interbpcorr(
364 input_filename_shift=args.input_filename_shift,
365 input_filename_slide=args.input_filename_slide,
366 input_filename_rise=args.input_filename_rise,
367 input_filename_tilt=args.input_filename_tilt,
368 input_filename_roll=args.input_filename_roll,
369 input_filename_twist=args.input_filename_twist,
370 output_csv_path=args.output_csv_path,
371 output_jpg_path=args.output_jpg_path,
372 properties=properties,
373 )
376if __name__ == "__main__":
377 main()