Coverage for biobb_dna/interbp_correlations/interbpcorr.py: 85%
144 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-07 09:06 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-07 09:06 +0000
1#!/usr/bin/env python3
3"""Module containing the InterBasePairCorrelation class and the command line interface."""
4import argparse
5from itertools import product
7import numpy as np
8import pandas as pd
9import matplotlib as mpl
10import matplotlib.pyplot as plt
12from biobb_common.generic.biobb_object import BiobbObject
13from biobb_common.configuration import settings
14from biobb_common.tools import file_utils as fu
15from biobb_common.tools.file_utils import launchlogger
16from biobb_dna.utils.loader import read_series
17from biobb_dna.utils import constants
20class InterBasePairCorrelation(BiobbObject):
21 """
22 | biobb_dna InterBasePairCorrelation
23 | Calculate correlation between all base pairs of a single sequence and for a single helical parameter.
25 Args:
26 input_filename_shift (str): Path to .ser file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330).
27 input_filename_slide (str): Path to .ser file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_slide.ser>`_. Accepted formats: ser (edam:format_2330).
28 input_filename_rise (str): Path to .ser file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_rise.ser>`_. Accepted formats: ser (edam:format_2330).
29 input_filename_tilt (str): Path to .ser file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_tilt.ser>`_. Accepted formats: ser (edam:format_2330).
30 input_filename_roll (str): Path to .ser file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_roll.ser>`_. Accepted formats: ser (edam:format_2330).
31 input_filename_twist (str): Path to .ser file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_twist.ser>`_. Accepted formats: ser (edam:format_2330).
32 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752).
33 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579).
34 properties (dict):
35 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option).
36 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence.
37 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
38 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
40 Examples:
41 This is a use example of how to use the building block from Python::
43 from biobb_dna.interbp_correlations.interbpcorr import interbpcorr
45 interbpcorr(
46 input_filename_shift='path/to/input/shift.ser',
47 input_filename_slide='path/to/input/slide.ser',
48 input_filename_rise='path/to/input/slide.ser',
49 input_filename_tilt='path/to/input/tilt.ser',
50 input_filename_roll='path/to/input/roll.ser',
51 input_filename_twist='path/to/input/twist.ser',
52 output_csv_path='path/to/output/file.csv',
53 output_jpg_path='path/to/output/plot.jpg',
54 properties=prop)
55 Info:
56 * wrapped_software:
57 * name: In house
58 * license: Apache-2.0
59 * ontology:
60 * name: EDAM
61 * schema: http://edamontology.org/EDAM.owl
63 """
65 def __init__(
66 self, input_filename_shift, input_filename_slide,
67 input_filename_rise, input_filename_tilt,
68 input_filename_roll, input_filename_twist,
69 output_csv_path, output_jpg_path,
70 properties=None, **kwargs) -> None:
71 properties = properties or {}
73 # Call parent class constructor
74 super().__init__(properties)
75 self.locals_var_dict = locals().copy()
77 # Input/Output files
78 self.io_dict = {
79 'in': {
80 'input_filename_shift': input_filename_shift,
81 'input_filename_slide': input_filename_slide,
82 'input_filename_rise': input_filename_rise,
83 'input_filename_tilt': input_filename_tilt,
84 'input_filename_roll': input_filename_roll,
85 'input_filename_twist': input_filename_twist
86 },
87 'out': {
88 'output_csv_path': output_csv_path,
89 'output_jpg_path': output_jpg_path
90 }
91 }
93 self.properties = properties
94 self.sequence = properties.get("sequence", None)
95 self.seqpos = properties.get("seqpos", None)
97 # Check the properties
98 self.check_properties(properties)
99 self.check_arguments()
101 @launchlogger
102 def launch(self) -> int:
103 """Execute the :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` object."""
105 # Setup Biobb
106 if self.check_restart():
107 return 0
108 self.stage_files()
110 # check sequence
111 if self.sequence is None or len(self.sequence) < 2:
112 raise ValueError("sequence is null or too short!")
114 # check seqpos
115 if self.seqpos is not None:
116 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
117 raise ValueError(
118 "seqpos must be a list of at least two integers")
120 # Creating temporary folder
121 self.tmp_folder = fu.create_unique_dir(prefix="bpcorrelation_")
122 fu.log('Creating %s temporary folder' % self.tmp_folder, self.out_log)
124 # read input
125 shift = read_series(
126 self.io_dict["in"]["input_filename_shift"], usecols=self.seqpos)
127 slide = read_series(
128 self.io_dict["in"]["input_filename_slide"], usecols=self.seqpos)
129 rise = read_series(
130 self.io_dict["in"]["input_filename_rise"], usecols=self.seqpos)
131 tilt = read_series(
132 self.io_dict["in"]["input_filename_tilt"], usecols=self.seqpos)
133 roll = read_series(
134 self.io_dict["in"]["input_filename_roll"], usecols=self.seqpos)
135 twist = read_series(
136 self.io_dict["in"]["input_filename_twist"], usecols=self.seqpos)
138 if self.seqpos is None:
139 # drop first and last columns
140 shift = shift[shift.columns[1:-2]]
141 slide = slide[slide.columns[1:-2]]
142 rise = rise[rise.columns[1:-2]]
143 tilt = tilt[tilt.columns[1:-2]]
144 roll = roll[roll.columns[1:-2]]
145 twist = twist[twist.columns[1:-2]]
146 labels = [
147 f"{i+1}_{self.sequence[i:i+2]}" for i in range(1, len(shift.columns) + 1)]
148 corr_index = [
149 f"{self.sequence[i:i+3]}" for i in range(1, len(shift.columns) + 1)]
150 else:
151 labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos]
152 corr_index = [f"{self.sequence[i:i+3]}" for i in self.seqpos]
154 # rename duplicated subunits
155 shift.columns = labels
156 slide.columns = labels
157 rise.columns = labels
158 tilt.columns = labels
159 roll.columns = labels
160 twist.columns = labels
162 # set names to each dataset
163 shift.name = "shift"
164 slide.name = "slide"
165 rise.name = "rise"
166 tilt.name = "tilt"
167 roll.name = "roll"
168 twist.name = "twist"
170 # get correlation between neighboring basepairs among all helical parameters
171 results = {}
172 datasets = [shift, slide, rise, tilt, roll, twist]
173 for ser1, ser2 in product(datasets, datasets):
174 ser2_shifted = ser2.shift(axis=1)
175 ser2_shifted[labels[0]] = ser2[labels[-1]]
176 if (
177 ser1.name in constants.hp_angular and ser2.name in constants.hp_angular):
178 method = self.circular
179 elif (
180 (
181 ser1.name in constants.hp_angular and not (
182 ser2.name in constants.hp_angular)
183 ) or (
184 ser2.name in constants.hp_angular and not (
185 ser1.name in constants.hp_angular)
186 )
187 ):
188 method = self.circlineal
189 else:
190 method = "pearson"
191 corr_data = ser1.corrwith(ser2_shifted, method=method)
192 corr_data.index = corr_index
193 results[f"{ser1.name}/{ser2.name}"] = corr_data
194 result_df = pd.DataFrame.from_dict(results)
195 result_df.index = corr_index
197 # save csv data
198 result_df.to_csv(self.io_dict["out"]["output_csv_path"])
200 # create heatmap
201 cmap = plt.get_cmap("bwr").copy()
202 bounds = [-1, -.8, -.6, -.4, -.2, .2, .4, .6, .8, 1]
203 num = cmap.N
204 norm = mpl.colors.BoundaryNorm(bounds, num)
205 cmap.set_bad(color="gainsboro")
206 fig, ax = plt.subplots(
207 1,
208 1,
209 dpi=300,
210 figsize=(7.5, 5),
211 tight_layout=True)
212 im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect='auto')
213 plt.colorbar(im, ticks=[-1, -.8, -.6, -.4, -.2, .2, .4, .6, .8, 1])
215 # axes
216 xlocs = np.arange(len(result_df.columns))
217 _ = ax.set_xticks(xlocs)
218 _ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90)
220 ylocs = np.arange(len(result_df.index))
221 _ = ax.set_yticks(ylocs)
222 _ = ax.set_yticklabels(result_df.index.to_list())
224 ax.set_title(
225 "Correlation for neighboring basepairs "
226 "and pairs of helical parameters")
228 fig.tight_layout()
229 fig.savefig(
230 self.io_dict['out']['output_jpg_path'],
231 format="jpg")
232 plt.close()
234 # Remove temporary file(s)
235 self.tmp_files.extend([
236 self.stage_io_dict.get("unique_dir"),
237 self.tmp_folder
238 ])
239 self.remove_tmp_files()
241 self.check_arguments(output_files_created=True, raise_exception=False)
243 return 0
245 @staticmethod
246 def circular(x1, x2):
247 x1 = x1 * np.pi / 180
248 x2 = x2 * np.pi / 180
249 diff_1 = np.sin(x1 - x1.mean())
250 diff_2 = np.sin(x2 - x2.mean())
251 num = (diff_1 * diff_2).sum()
252 den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum())
253 return num / den
255 @staticmethod
256 def circlineal(x1, x2):
257 x2 = x2 * np.pi / 180
258 rc = np.corrcoef(x1, np.cos(x2))[1, 0]
259 rs = np.corrcoef(x1, np.sin(x2))[1, 0]
260 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0]
261 num = (rc ** 2) + (rs ** 2) - 2 * rc * rs * rcs
262 den = 1 - (rcs ** 2)
263 correlation = np.sqrt(num / den)
264 if np.corrcoef(x1, x2)[1, 0] < 0:
265 correlation *= -1
266 return correlation
269def interbpcorr(
270 input_filename_shift: str, input_filename_slide: str,
271 input_filename_rise: str, input_filename_tilt: str,
272 input_filename_roll: str, input_filename_twist: str,
273 output_csv_path: str, output_jpg_path: str,
274 properties: dict = None, **kwargs) -> int:
275 """Create :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` class and
276 execute the :meth:`launch() <correlations.interbpcorr.InterBasePairCorrelation.launch>` method."""
278 return InterBasePairCorrelation(
279 input_filename_shift=input_filename_shift,
280 input_filename_slide=input_filename_slide,
281 input_filename_rise=input_filename_rise,
282 input_filename_tilt=input_filename_tilt,
283 input_filename_roll=input_filename_roll,
284 input_filename_twist=input_filename_twist,
285 output_csv_path=output_csv_path,
286 output_jpg_path=output_jpg_path,
287 properties=properties, **kwargs).launch()
290def main():
291 """Command line execution of this building block. Please check the command line documentation."""
292 parser = argparse.ArgumentParser(description='Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.',
293 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999))
294 parser.add_argument('--config', required=False, help='Configuration file')
296 required_args = parser.add_argument_group('required arguments')
297 required_args.add_argument('--input_filename_shift', required=True,
298 help='Path to ser file for helical parameter shift. Accepted formats: ser.')
299 required_args.add_argument('--input_filename_slide', required=True,
300 help='Path to ser file for helical parameter slide. Accepted formats: ser.')
301 required_args.add_argument('--input_filename_rise', required=True,
302 help='Path to ser file for helical parameter rise. Accepted formats: ser.')
303 required_args.add_argument('--input_filename_tilt', required=True,
304 help='Path to ser file for helical parameter tilt. Accepted formats: ser.')
305 required_args.add_argument('--input_filename_roll', required=True,
306 help='Path to ser file for helical parameter roll. Accepted formats: ser.')
307 required_args.add_argument('--input_filename_twist', required=True,
308 help='Path to ser file for helical parameter twist. Accepted formats: ser.')
309 required_args.add_argument('--output_csv_path', required=True,
310 help='Path to output file. Accepted formats: csv.')
311 required_args.add_argument('--output_jpg_path', required=True,
312 help='Path to output plot. Accepted formats: jpg.')
314 args = parser.parse_args()
315 args.config = args.config or "{}"
316 properties = settings.ConfReader(config=args.config).get_prop_dic()
318 interbpcorr(
319 input_filename_shift=args.input_filename_shift,
320 input_filename_slide=args.input_filename_slide,
321 input_filename_rise=args.input_filename_rise,
322 input_filename_tilt=args.input_filename_tilt,
323 input_filename_roll=args.input_filename_roll,
324 input_filename_twist=args.input_filename_twist,
325 output_csv_path=args.output_csv_path,
326 output_jpg_path=args.output_jpg_path,
327 properties=properties)
330if __name__ == '__main__':
331 main()