1 #!/usr/bin/env python3
2
3 """Module containing the InterBasePairCorrelation class and the command line interface."""
4
5 import argparse
6 from itertools import product
7 from typing import Optional
8
9 import matplotlib as mpl
10 import matplotlib.pyplot as plt
11 import numpy as np
12 import pandas as pd
13 from biobb_common.configuration import settings
14 from biobb_common.generic.biobb_object import BiobbObject
15 from biobb_common.tools.file_utils import launchlogger
16
17 from biobb_dna.utils import constants
18 from biobb_dna.utils.common import _from_string_to_list
19 from biobb_dna.utils.loader import read_series
20
21
22 class InterBasePairCorrelation(BiobbObject):
23 """
24 | biobb_dna InterBasePairCorrelation
25 | Calculate correlation between all base pairs of a single sequence and for a single helical parameter.
26 | Calculate correlation between neighboring base pairs and pairs of helical parameters.
27
28 Args:
29 input_filename_shift (str): Path to .ser file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330).
30 input_filename_slide (str): Path to .ser file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_slide.ser>`_. Accepted formats: ser (edam:format_2330).
31 input_filename_rise (str): Path to .ser file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_rise.ser>`_. Accepted formats: ser (edam:format_2330).
32 input_filename_tilt (str): Path to .ser file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_tilt.ser>`_. Accepted formats: ser (edam:format_2330).
33 input_filename_roll (str): Path to .ser file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_roll.ser>`_. Accepted formats: ser (edam:format_2330).
34 input_filename_twist (str): Path to .ser file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_twist.ser>`_. Accepted formats: ser (edam:format_2330).
35 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752).
36 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579).
37 properties (dict):
38 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option).
39 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence.
40 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
41 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
42 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
43
44 Examples:
45 This is a use example of how to use the building block from Python::
46
47 from biobb_dna.interbp_correlations.interbpcorr import interbpcorr
48
49 interbpcorr(
50 input_filename_shift='path/to/input/shift.ser',
51 input_filename_slide='path/to/input/slide.ser',
52 input_filename_rise='path/to/input/slide.ser',
53 input_filename_tilt='path/to/input/tilt.ser',
54 input_filename_roll='path/to/input/roll.ser',
55 input_filename_twist='path/to/input/twist.ser',
56 output_csv_path='path/to/output/file.csv',
57 output_jpg_path='path/to/output/plot.jpg',
58 properties=prop)
59 Info:
60 * wrapped_software:
61 * name: In house
62 * license: Apache-2.0
63 * ontology:
64 * name: EDAM
65 * schema: http://edamontology.org/EDAM.owl
66
67 """
68
69 def __init__(
70 self,
71 input_filename_shift,
72 input_filename_slide,
73 input_filename_rise,
74 input_filename_tilt,
75 input_filename_roll,
76 input_filename_twist,
77 output_csv_path,
78 output_jpg_path,
79 properties=None,
80 **kwargs,
81 ) -> None:
82 properties = properties or {}
83
84 # Call parent class constructor
85 super().__init__(properties)
86 self.locals_var_dict = locals().copy()
87
88 # Input/Output files
89 self.io_dict = {
90 "in": {
91 "input_filename_shift": input_filename_shift,
92 "input_filename_slide": input_filename_slide,
93 "input_filename_rise": input_filename_rise,
94 "input_filename_tilt": input_filename_tilt,
95 "input_filename_roll": input_filename_roll,
96 "input_filename_twist": input_filename_twist,
97 },
98 "out": {
99 "output_csv_path": output_csv_path,
100 "output_jpg_path": output_jpg_path,
101 },
102 }
103
104 self.properties = properties
105 self.sequence = properties.get("sequence", None)
106 self.seqpos = [
107 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None))
108 ]
109
110 # Check the properties
111 self.check_properties(properties)
112 self.check_arguments()
113
114 @launchlogger
115 def launch(self) -> int:
116 """Execute the :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` object."""
117
118 # Setup Biobb
119 if self.check_restart():
120 return 0
121 self.stage_files()
122
123 # check sequence
124 if self.sequence is None or len(self.sequence) < 2:
125 raise ValueError("sequence is null or too short!")
126
127 # check seqpos
128 if self.seqpos:
129 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
130 raise ValueError("seqpos must be a list of at least two integers")
131 else:
132 self.seqpos = None # type: ignore
133
134 # read input
135 shift = read_series(
136 self.stage_io_dict["in"]["input_filename_shift"], usecols=self.seqpos
137 )
138 slide = read_series(
139 self.stage_io_dict["in"]["input_filename_slide"], usecols=self.seqpos
140 )
141 rise = read_series(
142 self.stage_io_dict["in"]["input_filename_rise"], usecols=self.seqpos
143 )
144 tilt = read_series(
145 self.stage_io_dict["in"]["input_filename_tilt"], usecols=self.seqpos
146 )
147 roll = read_series(
148 self.stage_io_dict["in"]["input_filename_roll"], usecols=self.seqpos
149 )
150 twist = read_series(
151 self.stage_io_dict["in"]["input_filename_twist"], usecols=self.seqpos
152 )
153
154 if not self.seqpos:
155 # drop first and last columns
156 shift = shift[shift.columns[1:-2]]
157 slide = slide[slide.columns[1:-2]]
158 rise = rise[rise.columns[1:-2]]
159 tilt = tilt[tilt.columns[1:-2]]
160 roll = roll[roll.columns[1:-2]]
161 twist = twist[twist.columns[1:-2]]
162 labels = [
163 f"{i+1}_{self.sequence[i:i+2]}"
164 for i in range(1, len(shift.columns) + 1)
165 ]
166 corr_index = [
167 f"{self.sequence[i:i+3]}" for i in range(1, len(shift.columns) + 1)
168 ]
169 else:
170 labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos]
171 corr_index = [f"{self.sequence[i:i+3]}" for i in self.seqpos]
172
173 # rename duplicated subunits
174 shift.columns = labels
175 slide.columns = labels
176 rise.columns = labels
177 tilt.columns = labels
178 roll.columns = labels
179 twist.columns = labels
180
181 # set names to each dataset
182 shift.name = "shift"
183 slide.name = "slide"
184 rise.name = "rise"
185 tilt.name = "tilt"
186 roll.name = "roll"
187 twist.name = "twist"
188
189 # get correlation between neighboring basepairs among all helical parameters
190 results = {}
191 datasets = [shift, slide, rise, tilt, roll, twist]
192 for ser1, ser2 in product(datasets, datasets):
193 ser2_shifted = ser2.shift(axis=1)
194 ser2_shifted[labels[0]] = ser2[labels[-1]]
195 if ser1.name in constants.hp_angular and ser2.name in constants.hp_angular:
196 method = self.circular
197 elif (
198 ser1.name in constants.hp_angular
-
W503
Line break before binary operator
199 and ser2.name not in constants.hp_angular
200 ) or (
201 ser2.name in constants.hp_angular
-
W503
Line break before binary operator
202 and ser1.name not in constants.hp_angular
203 ):
204 method = self.circlineal
205 else:
206 method = "pearson" # type: ignore
207 corr_data = ser1.corrwith(ser2_shifted, method=method)
208 corr_data.index = corr_index
209 results[f"{ser1.name}/{ser2.name}"] = corr_data
210 result_df = pd.DataFrame.from_dict(results)
211 result_df.index = corr_index # type: ignore
212
213 # save csv data
214 result_df.to_csv(self.stage_io_dict["out"]["output_csv_path"])
215
216 # create heatmap
217 cmap = plt.get_cmap("bwr").copy()
218 bounds = [-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1]
219 num = cmap.N
220 norm = mpl.colors.BoundaryNorm(bounds, num) # type: ignore
221 cmap.set_bad(color="gainsboro")
222 fig, ax = plt.subplots(1, 1, dpi=300, figsize=(7.5, 5), tight_layout=True)
223 im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect="auto")
224 plt.colorbar(im, ticks=[-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1])
225
226 # axes
227 xlocs = np.arange(len(result_df.columns))
228 _ = ax.set_xticks(xlocs)
229 _ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90)
230
231 ylocs = np.arange(len(result_df.index))
232 _ = ax.set_yticks(ylocs)
233 _ = ax.set_yticklabels(result_df.index.to_list()) # type: ignore
234
235 ax.set_title(
236 "Correlation for neighboring basepairs " "and pairs of helical parameters"
237 )
238
239 fig.tight_layout()
240 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg")
241 plt.close()
242
243 # Copy files to host
244 self.copy_to_host()
245
246 # Remove temporary file(s)
247 self.tmp_files.extend(
248 [
249 self.stage_io_dict.get("unique_dir", ""),
250 ]
251 )
252 self.remove_tmp_files()
253
254 self.check_arguments(output_files_created=True, raise_exception=False)
255
256 return 0
257
258 @staticmethod
259 def circular(x1, x2):
260 x1 = x1 * np.pi / 180
261 x2 = x2 * np.pi / 180
262 diff_1 = np.sin(x1 - x1.mean())
263 diff_2 = np.sin(x2 - x2.mean())
264 num = (diff_1 * diff_2).sum()
265 den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum())
266 return num / den
267
268 @staticmethod
269 def circlineal(x1, x2):
270 x2 = x2 * np.pi / 180
271 rc = np.corrcoef(x1, np.cos(x2))[1, 0]
272 rs = np.corrcoef(x1, np.sin(x2))[1, 0]
273 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0]
274 num = (rc**2) + (rs**2) - 2 * rc * rs * rcs
275 den = 1 - (rcs**2)
276 correlation = np.sqrt(num / den)
277 if np.corrcoef(x1, x2)[1, 0] < 0:
278 correlation *= -1
279 return correlation
280
281
282 def interbpcorr(
283 input_filename_shift: str,
284 input_filename_slide: str,
285 input_filename_rise: str,
286 input_filename_tilt: str,
287 input_filename_roll: str,
288 input_filename_twist: str,
289 output_csv_path: str,
290 output_jpg_path: str,
291 properties: Optional[dict] = None,
292 **kwargs,
293 ) -> int:
294 """Create :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` class and
295 execute the :meth:`launch() <correlations.interbpcorr.InterBasePairCorrelation.launch>` method."""
296
297 return InterBasePairCorrelation(
298 input_filename_shift=input_filename_shift,
299 input_filename_slide=input_filename_slide,
300 input_filename_rise=input_filename_rise,
301 input_filename_tilt=input_filename_tilt,
302 input_filename_roll=input_filename_roll,
303 input_filename_twist=input_filename_twist,
304 output_csv_path=output_csv_path,
305 output_jpg_path=output_jpg_path,
306 properties=properties,
307 **kwargs,
308 ).launch()
309
310
311 def main():
312 """Command line execution of this building block. Please check the command line documentation."""
313 parser = argparse.ArgumentParser(
314 description="Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.",
315 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999),
316 )
317 parser.add_argument("--config", required=False, help="Configuration file")
318
319 required_args = parser.add_argument_group("required arguments")
320 required_args.add_argument(
321 "--input_filename_shift",
322 required=True,
323 help="Path to ser file for helical parameter shift. Accepted formats: ser.",
324 )
325 required_args.add_argument(
326 "--input_filename_slide",
327 required=True,
328 help="Path to ser file for helical parameter slide. Accepted formats: ser.",
329 )
330 required_args.add_argument(
331 "--input_filename_rise",
332 required=True,
333 help="Path to ser file for helical parameter rise. Accepted formats: ser.",
334 )
335 required_args.add_argument(
336 "--input_filename_tilt",
337 required=True,
338 help="Path to ser file for helical parameter tilt. Accepted formats: ser.",
339 )
340 required_args.add_argument(
341 "--input_filename_roll",
342 required=True,
343 help="Path to ser file for helical parameter roll. Accepted formats: ser.",
344 )
345 required_args.add_argument(
346 "--input_filename_twist",
347 required=True,
348 help="Path to ser file for helical parameter twist. Accepted formats: ser.",
349 )
350 required_args.add_argument(
351 "--output_csv_path",
352 required=True,
353 help="Path to output file. Accepted formats: csv.",
354 )
355 required_args.add_argument(
356 "--output_jpg_path",
357 required=True,
358 help="Path to output plot. Accepted formats: jpg.",
359 )
360
361 args = parser.parse_args()
362 args.config = args.config or "{}"
363 properties = settings.ConfReader(config=args.config).get_prop_dic()
364
365 interbpcorr(
366 input_filename_shift=args.input_filename_shift,
367 input_filename_slide=args.input_filename_slide,
368 input_filename_rise=args.input_filename_rise,
369 input_filename_tilt=args.input_filename_tilt,
370 input_filename_roll=args.input_filename_roll,
371 input_filename_twist=args.input_filename_twist,
372 output_csv_path=args.output_csv_path,
373 output_jpg_path=args.output_jpg_path,
374 properties=properties,
375 )
376
377
378 if __name__ == "__main__":
379 main()