Coverage for biobb_dna/dna/dna_averages.py: 72%
95 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 10:36 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 10:36 +0000
1#!/usr/bin/env python3
3"""Module containing the HelParAverages class and the command line interface."""
5import argparse
6from pathlib import Path
7from typing import Optional
9import matplotlib.pyplot as plt
10import pandas as pd
11from biobb_common.configuration import settings
12from biobb_common.generic.biobb_object import BiobbObject
13from biobb_common.tools.file_utils import launchlogger
15from biobb_dna.utils import constants
16from biobb_dna.utils.common import _from_string_to_list
17from biobb_dna.utils.loader import read_series
20class HelParAverages(BiobbObject):
21 """
22 | biobb_dna HelParAverages
23 | Load .ser file for a given helical parameter and read each column corresponding to a base calculating average over each one.
24 | Calculate average values for each base pair and save them in a .csv file.
26 Args:
27 input_ser_path (str): Path to .ser file for helical parameter. File is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/dna/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330).
28 output_csv_path (str): Path to .csv file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/shift_avg.csv>`_. Accepted formats: csv (edam:format_3752).
29 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/shift_avg.jpg>`_. Accepted formats: jpg (edam:format_3579).
30 properties (dict):
31 * **sequence** (*str*) - (None) Nucleic acid sequence corresponding to the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option).
32 * **helpar_name** (*str*) - (Optional) helical parameter name.
33 * **stride** (*int*) - (1000) granularity of the number of snapshots for plotting time series.
34 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence.
35 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
36 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
37 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
38 Examples:
39 This is a use example of how to use the building block from Python::
41 from biobb_dna.dna.dna_averages import dna_averages
43 prop = {
44 'helpar_name': 'twist',
45 'seqpos': [1,2],
46 'sequence': 'GCAT'
47 }
48 dna_averages(
49 input_ser_path='/path/to/twist.ser',
50 output_csv_path='/path/to/table/output.csv',
51 output_jpg_path='/path/to/table/output.jpg',
52 properties=prop)
54 Info:
55 * wrapped_software:
56 * name: In house
57 * license: Apache-2.0
58 * ontology:
59 * name: EDAM
60 * schema: http://edamontology.org/EDAM.owl
62 """
64 def __init__(
65 self,
66 input_ser_path,
67 output_csv_path,
68 output_jpg_path,
69 properties=None,
70 **kwargs,
71 ) -> None:
72 properties = properties or {}
74 # Call parent class constructor
75 super().__init__(properties)
76 self.locals_var_dict = locals().copy()
78 # Input/Output files
79 self.io_dict = {
80 "in": {
81 "input_ser_path": input_ser_path,
82 },
83 "out": {
84 "output_csv_path": output_csv_path,
85 "output_jpg_path": output_jpg_path,
86 },
87 }
89 # Properties specific for BB
90 self.properties = properties
91 self.sequence = properties.get("sequence", None)
92 self.stride = properties.get("stride", 1000)
93 self.seqpos = [
94 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None))
95 ]
96 self.helpar_name = properties.get("helpar_name", None)
98 # Check the properties
99 self.check_properties(properties)
100 self.check_arguments()
102 @launchlogger
103 def launch(self) -> int:
104 """Execute the :class:`HelParAverages <dna.averages.HelParAverages>` object."""
106 # Setup Biobb
107 if self.check_restart():
108 return 0
109 self.stage_files()
111 # check sequence
112 if self.sequence is None or len(self.sequence) < 2:
113 raise ValueError("sequence is null or too short!")
115 # get helical parameter from filename if not specified
116 if self.helpar_name is None:
117 for hp in constants.helical_parameters:
118 ser_name = Path(self.stage_io_dict["in"]["input_ser_path"]).name.lower()
119 if hp.lower() in ser_name:
120 self.helpar_name = hp
121 if self.helpar_name is None:
122 raise ValueError(
123 "Helical parameter name can't be inferred from file, "
124 "so it must be specified!"
125 )
126 else:
127 if self.helpar_name not in constants.helical_parameters:
128 raise ValueError(
129 "Helical parameter name is invalid! "
130 f"Options: {constants.helical_parameters}"
131 )
133 # get base length and unit from helical parameter name
134 if self.helpar_name.lower() in constants.hp_basepairs:
135 self.baselen = 1
136 elif self.helpar_name.lower() in constants.hp_singlebases:
137 self.baselen = 0
138 if self.helpar_name in constants.hp_angular:
139 self.hp_unit = "Degrees"
140 else:
141 self.hp_unit = "Angstroms"
143 # check seqpos
144 if self.seqpos:
145 if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1):
146 raise ValueError(
147 f"seqpos values must be between 1 and {len(self.sequence) - 2}"
148 )
149 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
150 raise ValueError("seqpos must be a list of at least two integers")
151 else:
152 self.seqpos = None # type: ignore
154 # read input .ser file
155 ser_data = read_series(
156 self.stage_io_dict["in"]["input_ser_path"], usecols=self.seqpos
157 )
158 if not self.seqpos:
159 ser_data = ser_data[ser_data.columns[1:-1]]
160 # discard first and last base(pairs) from sequence
161 sequence = self.sequence[1:]
162 xlabels = [
163 f"{sequence[i:i+1+self.baselen]}"
164 for i in range(len(ser_data.columns) - self.baselen)
165 ]
166 else:
167 sequence = self.sequence
168 xlabels = [f"{sequence[i:i+1+self.baselen]}" for i in self.seqpos]
170 # rename duplicated subunits
171 while any(pd.Index(ser_data.columns).duplicated()):
172 ser_data.columns = [
173 name if not duplicated else name + "_dup"
174 for duplicated, name in zip(
175 pd.Index(ser_data.columns).duplicated(), ser_data.columns
176 )
177 ]
179 # write output files for all selected bases
180 means = ser_data.mean(axis=0).iloc[: len(xlabels)]
181 stds = ser_data.std(axis=0).iloc[: len(xlabels)]
183 # save plot
184 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True)
185 axs.errorbar(
186 means.index, means.to_numpy(), yerr=stds.to_numpy(), marker="o", capsize=5
187 )
188 axs.set_xticks(means.index)
189 axs.set_xticklabels(xlabels, rotation=90)
190 axs.set_xlabel("Sequence Base Pair " f"{'Step' if self.baselen==1 else ''}")
191 axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})")
192 axs.set_title(
193 "Base Pair "
194 f"{'Step' if self.baselen==1 else ''} "
195 f"Helical Parameter: {self.helpar_name.capitalize()}"
196 )
197 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg")
199 # save table
200 dataset = pd.DataFrame(
201 {
202 f"Base Pair {'Step' if self.baselen==1 else ''}": xlabels,
203 "mean": means.to_numpy(),
204 "std": stds.to_numpy(),
205 }
206 )
207 dataset.to_csv(self.stage_io_dict["out"]["output_csv_path"], index=False)
209 plt.close()
211 # Copy files to host
212 self.copy_to_host()
214 # Remove temporary file(s)
215 # self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")])
216 self.remove_tmp_files()
218 self.check_arguments(output_files_created=True, raise_exception=False)
220 return 0
223def dna_averages(
224 input_ser_path: str,
225 output_csv_path: str,
226 output_jpg_path: str,
227 properties: Optional[dict] = None,
228 **kwargs,
229) -> int:
230 """Create :class:`HelParAverages <dna.dna_averages.HelParAverages>` class and
231 execute the :meth:`launch() <dna.dna_averages.HelParAverages.launch>` method."""
233 return HelParAverages(
234 input_ser_path=input_ser_path,
235 output_csv_path=output_csv_path,
236 output_jpg_path=output_jpg_path,
237 properties=properties,
238 **kwargs,
239 ).launch()
241 dna_averages.__doc__ = HelParAverages.__doc__
244def main():
245 """Command line execution of this building block. Please check the command line documentation."""
246 parser = argparse.ArgumentParser(
247 description="Load helical parameter file and calculate average values for each base pair.",
248 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999),
249 )
250 parser.add_argument("--config", required=False, help="Configuration file")
252 required_args = parser.add_argument_group("required arguments")
253 required_args.add_argument(
254 "--input_ser_path",
255 required=True,
256 help="Helical parameter input ser file path. Accepted formats: ser.",
257 )
258 required_args.add_argument(
259 "--output_csv_path",
260 required=True,
261 help="Path to output csv file. Accepted formats: csv.",
262 )
263 required_args.add_argument(
264 "--output_jpg_path",
265 required=True,
266 help="Path to output jpg file. Accepted formats: jpg.",
267 )
269 args = parser.parse_args()
270 args.config = args.config or "{}"
271 properties = settings.ConfReader(config=args.config).get_prop_dic()
273 dna_averages(
274 input_ser_path=args.input_ser_path,
275 output_csv_path=args.output_csv_path,
276 output_jpg_path=args.output_jpg_path,
277 properties=properties,
278 )
281if __name__ == "__main__":
282 main()