⬅ biobb_dna/interbp_correlations/interbpcorr.py source

1 #!/usr/bin/env python3
2  
3 """Module containing the InterBasePairCorrelation class and the command line interface."""
4  
5 import argparse
6 from itertools import product
7 from typing import Optional
8  
9 import matplotlib as mpl
10 import matplotlib.pyplot as plt
11 import numpy as np
12 import pandas as pd
13 from biobb_common.configuration import settings
14 from biobb_common.generic.biobb_object import BiobbObject
15 from biobb_common.tools.file_utils import launchlogger
16  
17 from biobb_dna.utils import constants
18 from biobb_dna.utils.common import _from_string_to_list
19 from biobb_dna.utils.loader import read_series
20  
21  
22 class InterBasePairCorrelation(BiobbObject):
23 """
24 | biobb_dna InterBasePairCorrelation
25 | Calculate correlation between all base pairs of a single sequence and for a single helical parameter.
26 | Calculate correlation between neighboring base pairs and pairs of helical parameters.
27  
28 Args:
29 input_filename_shift (str): Path to .ser file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330).
30 input_filename_slide (str): Path to .ser file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_slide.ser>`_. Accepted formats: ser (edam:format_2330).
31 input_filename_rise (str): Path to .ser file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_rise.ser>`_. Accepted formats: ser (edam:format_2330).
32 input_filename_tilt (str): Path to .ser file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_tilt.ser>`_. Accepted formats: ser (edam:format_2330).
33 input_filename_roll (str): Path to .ser file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_roll.ser>`_. Accepted formats: ser (edam:format_2330).
34 input_filename_twist (str): Path to .ser file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_twist.ser>`_. Accepted formats: ser (edam:format_2330).
35 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752).
36 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579).
37 properties (dict):
38 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option).
39 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence.
40 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
41 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
42 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
43  
44 Examples:
45 This is a use example of how to use the building block from Python::
46  
47 from biobb_dna.interbp_correlations.interbpcorr import interbpcorr
48  
49 interbpcorr(
50 input_filename_shift='path/to/input/shift.ser',
51 input_filename_slide='path/to/input/slide.ser',
52 input_filename_rise='path/to/input/slide.ser',
53 input_filename_tilt='path/to/input/tilt.ser',
54 input_filename_roll='path/to/input/roll.ser',
55 input_filename_twist='path/to/input/twist.ser',
56 output_csv_path='path/to/output/file.csv',
57 output_jpg_path='path/to/output/plot.jpg',
58 properties=prop)
59 Info:
60 * wrapped_software:
61 * name: In house
62 * license: Apache-2.0
63 * ontology:
64 * name: EDAM
65 * schema: http://edamontology.org/EDAM.owl
66  
67 """
68  
69 def __init__(
70 self,
71 input_filename_shift,
72 input_filename_slide,
73 input_filename_rise,
74 input_filename_tilt,
75 input_filename_roll,
76 input_filename_twist,
77 output_csv_path,
78 output_jpg_path,
79 properties=None,
80 **kwargs,
81 ) -> None:
82 properties = properties or {}
83  
84 # Call parent class constructor
85 super().__init__(properties)
86 self.locals_var_dict = locals().copy()
87  
88 # Input/Output files
89 self.io_dict = {
90 "in": {
91 "input_filename_shift": input_filename_shift,
92 "input_filename_slide": input_filename_slide,
93 "input_filename_rise": input_filename_rise,
94 "input_filename_tilt": input_filename_tilt,
95 "input_filename_roll": input_filename_roll,
96 "input_filename_twist": input_filename_twist,
97 },
98 "out": {
99 "output_csv_path": output_csv_path,
100 "output_jpg_path": output_jpg_path,
101 },
102 }
103  
104 self.properties = properties
105 self.sequence = properties.get("sequence", None)
106 self.seqpos = [
107 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None))
108 ]
109  
110 # Check the properties
111 self.check_properties(properties)
112 self.check_arguments()
113  
114 @launchlogger
115 def launch(self) -> int:
116 """Execute the :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` object."""
117  
118 # Setup Biobb
119 if self.check_restart():
120 return 0
121 self.stage_files()
122  
123 # check sequence
124 if self.sequence is None or len(self.sequence) < 2:
125 raise ValueError("sequence is null or too short!")
126  
127 # check seqpos
128 if self.seqpos:
129 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
130 raise ValueError("seqpos must be a list of at least two integers")
131 else:
132 self.seqpos = None # type: ignore
133  
134 # read input
135 shift = read_series(
136 self.stage_io_dict["in"]["input_filename_shift"], usecols=self.seqpos
137 )
138 slide = read_series(
139 self.stage_io_dict["in"]["input_filename_slide"], usecols=self.seqpos
140 )
141 rise = read_series(
142 self.stage_io_dict["in"]["input_filename_rise"], usecols=self.seqpos
143 )
144 tilt = read_series(
145 self.stage_io_dict["in"]["input_filename_tilt"], usecols=self.seqpos
146 )
147 roll = read_series(
148 self.stage_io_dict["in"]["input_filename_roll"], usecols=self.seqpos
149 )
150 twist = read_series(
151 self.stage_io_dict["in"]["input_filename_twist"], usecols=self.seqpos
152 )
153  
154 if not self.seqpos:
155 # drop first and last columns
156 shift = shift[shift.columns[1:-2]]
157 slide = slide[slide.columns[1:-2]]
158 rise = rise[rise.columns[1:-2]]
159 tilt = tilt[tilt.columns[1:-2]]
160 roll = roll[roll.columns[1:-2]]
161 twist = twist[twist.columns[1:-2]]
162 labels = [
163 f"{i+1}_{self.sequence[i:i+2]}"
164 for i in range(1, len(shift.columns) + 1)
165 ]
166 corr_index = [
167 f"{self.sequence[i:i+3]}" for i in range(1, len(shift.columns) + 1)
168 ]
169 else:
170 labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos]
171 corr_index = [f"{self.sequence[i:i+3]}" for i in self.seqpos]
172  
173 # rename duplicated subunits
174 shift.columns = labels
175 slide.columns = labels
176 rise.columns = labels
177 tilt.columns = labels
178 roll.columns = labels
179 twist.columns = labels
180  
181 # set names to each dataset
182 shift.name = "shift"
183 slide.name = "slide"
184 rise.name = "rise"
185 tilt.name = "tilt"
186 roll.name = "roll"
187 twist.name = "twist"
188  
189 # get correlation between neighboring basepairs among all helical parameters
190 results = {}
191 datasets = [shift, slide, rise, tilt, roll, twist]
192 for ser1, ser2 in product(datasets, datasets):
193 ser2_shifted = ser2.shift(axis=1)
194 ser2_shifted[labels[0]] = ser2[labels[-1]]
195 if ser1.name in constants.hp_angular and ser2.name in constants.hp_angular:
196 method = self.circular
197 elif (
198 ser1.name in constants.hp_angular
  • W503 Line break before binary operator
199 and ser2.name not in constants.hp_angular
200 ) or (
201 ser2.name in constants.hp_angular
  • W503 Line break before binary operator
202 and ser1.name not in constants.hp_angular
203 ):
204 method = self.circlineal
205 else:
206 method = "pearson" # type: ignore
207 corr_data = ser1.corrwith(ser2_shifted, method=method)
208 corr_data.index = corr_index
209 results[f"{ser1.name}/{ser2.name}"] = corr_data
210 result_df = pd.DataFrame.from_dict(results)
211 result_df.index = corr_index # type: ignore
212  
213 # save csv data
214 result_df.to_csv(self.stage_io_dict["out"]["output_csv_path"])
215  
216 # create heatmap
217 cmap = plt.get_cmap("bwr").copy()
218 bounds = [-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1]
219 num = cmap.N
220 norm = mpl.colors.BoundaryNorm(bounds, num) # type: ignore
221 cmap.set_bad(color="gainsboro")
222 fig, ax = plt.subplots(1, 1, dpi=300, figsize=(7.5, 5), tight_layout=True)
223 im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect="auto")
224 plt.colorbar(im, ticks=[-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1])
225  
226 # axes
227 xlocs = np.arange(len(result_df.columns))
228 _ = ax.set_xticks(xlocs)
229 _ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90)
230  
231 ylocs = np.arange(len(result_df.index))
232 _ = ax.set_yticks(ylocs)
233 _ = ax.set_yticklabels(result_df.index.to_list()) # type: ignore
234  
235 ax.set_title(
236 "Correlation for neighboring basepairs " "and pairs of helical parameters"
237 )
238  
239 fig.tight_layout()
240 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg")
241 plt.close()
242  
243 # Copy files to host
244 self.copy_to_host()
245  
246 # Remove temporary file(s)
247 self.tmp_files.extend(
248 [
249 self.stage_io_dict.get("unique_dir", ""),
250 ]
251 )
252 self.remove_tmp_files()
253  
254 self.check_arguments(output_files_created=True, raise_exception=False)
255  
256 return 0
257  
258 @staticmethod
259 def circular(x1, x2):
260 x1 = x1 * np.pi / 180
261 x2 = x2 * np.pi / 180
262 diff_1 = np.sin(x1 - x1.mean())
263 diff_2 = np.sin(x2 - x2.mean())
264 num = (diff_1 * diff_2).sum()
265 den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum())
266 return num / den
267  
268 @staticmethod
269 def circlineal(x1, x2):
270 x2 = x2 * np.pi / 180
271 rc = np.corrcoef(x1, np.cos(x2))[1, 0]
272 rs = np.corrcoef(x1, np.sin(x2))[1, 0]
273 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0]
274 num = (rc**2) + (rs**2) - 2 * rc * rs * rcs
275 den = 1 - (rcs**2)
276 correlation = np.sqrt(num / den)
277 if np.corrcoef(x1, x2)[1, 0] < 0:
278 correlation *= -1
279 return correlation
280  
281  
282 def interbpcorr(
283 input_filename_shift: str,
284 input_filename_slide: str,
285 input_filename_rise: str,
286 input_filename_tilt: str,
287 input_filename_roll: str,
288 input_filename_twist: str,
289 output_csv_path: str,
290 output_jpg_path: str,
291 properties: Optional[dict] = None,
292 **kwargs,
293 ) -> int:
294 """Create :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` class and
295 execute the :meth:`launch() <correlations.interbpcorr.InterBasePairCorrelation.launch>` method."""
296  
297 return InterBasePairCorrelation(
298 input_filename_shift=input_filename_shift,
299 input_filename_slide=input_filename_slide,
300 input_filename_rise=input_filename_rise,
301 input_filename_tilt=input_filename_tilt,
302 input_filename_roll=input_filename_roll,
303 input_filename_twist=input_filename_twist,
304 output_csv_path=output_csv_path,
305 output_jpg_path=output_jpg_path,
306 properties=properties,
307 **kwargs,
308 ).launch()
309  
310  
311 def main():
312 """Command line execution of this building block. Please check the command line documentation."""
313 parser = argparse.ArgumentParser(
314 description="Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.",
315 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999),
316 )
317 parser.add_argument("--config", required=False, help="Configuration file")
318  
319 required_args = parser.add_argument_group("required arguments")
320 required_args.add_argument(
321 "--input_filename_shift",
322 required=True,
323 help="Path to ser file for helical parameter shift. Accepted formats: ser.",
324 )
325 required_args.add_argument(
326 "--input_filename_slide",
327 required=True,
328 help="Path to ser file for helical parameter slide. Accepted formats: ser.",
329 )
330 required_args.add_argument(
331 "--input_filename_rise",
332 required=True,
333 help="Path to ser file for helical parameter rise. Accepted formats: ser.",
334 )
335 required_args.add_argument(
336 "--input_filename_tilt",
337 required=True,
338 help="Path to ser file for helical parameter tilt. Accepted formats: ser.",
339 )
340 required_args.add_argument(
341 "--input_filename_roll",
342 required=True,
343 help="Path to ser file for helical parameter roll. Accepted formats: ser.",
344 )
345 required_args.add_argument(
346 "--input_filename_twist",
347 required=True,
348 help="Path to ser file for helical parameter twist. Accepted formats: ser.",
349 )
350 required_args.add_argument(
351 "--output_csv_path",
352 required=True,
353 help="Path to output file. Accepted formats: csv.",
354 )
355 required_args.add_argument(
356 "--output_jpg_path",
357 required=True,
358 help="Path to output plot. Accepted formats: jpg.",
359 )
360  
361 args = parser.parse_args()
362 args.config = args.config or "{}"
363 properties = settings.ConfReader(config=args.config).get_prop_dic()
364  
365 interbpcorr(
366 input_filename_shift=args.input_filename_shift,
367 input_filename_slide=args.input_filename_slide,
368 input_filename_rise=args.input_filename_rise,
369 input_filename_tilt=args.input_filename_tilt,
370 input_filename_roll=args.input_filename_roll,
371 input_filename_twist=args.input_filename_twist,
372 output_csv_path=args.output_csv_path,
373 output_jpg_path=args.output_jpg_path,
374 properties=properties,
375 )
376  
377  
378 if __name__ == "__main__":
379 main()