Coverage for biobb_dna/dna/dna_timeseries.py: 76%
131 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 10:36 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 10:36 +0000
1#!/usr/bin/env python3
3"""Module containing the HelParTimeSeries class and the command line interface."""
5import argparse
6import re
7import zipfile
8from pathlib import Path
9from typing import Optional
11import matplotlib.pyplot as plt
12import pandas as pd
13from biobb_common.configuration import settings
14from biobb_common.generic.biobb_object import BiobbObject
15from biobb_common.tools import file_utils as fu
16from biobb_common.tools.file_utils import launchlogger
18from biobb_dna.utils import constants
19from biobb_dna.utils.common import _from_string_to_list
20from biobb_dna.utils.loader import read_series
23class HelParTimeSeries(BiobbObject):
24 """
25 | biobb_dna HelParTimeSeries
26 | Created time series and histogram plots for each base pair from a helical parameter series file.
27 | The helical parameter series file is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair.
29 Args:
30 input_ser_path (str): Path to .ser file for helical parameter. File is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/dna/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330).
31 output_zip_path (str): Path to output .zip files where data is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/timeseries_output.zip>`_. Accepted formats: zip (edam:format_3987).
32 properties (dict):
33 * **sequence** (*str*) - (None) Nucleic acid sequence corresponding to the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *usecols* option).
34 * **bins** (*int*) - (None) Bins for histogram. Parameter has same options as matplotlib.pyplot.hist.
35 * **helpar_name** (*str*) - (None) Helical parameter name. It must match the name of the helical parameter in the .ser input file. Values: majd, majw, mind, minw, inclin, tip, xdisp, ydisp, shear, stretch, stagger, buckle, propel, opening, rise, roll, twist, shift, slide, tilt, alphaC, alphaW, betaC, betaW, gammaC, gammaW, deltaC, deltaW, epsilC, epsilW, zetaC, zetaW, chiC, chiW, phaseC, phaseW.
36 * **stride** (*int*) - (1000) granularity of the number of snapshots for plotting time series.
37 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 1) to analyze. If not specified it will analyse the complete sequence.
38 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
39 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
40 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
42 Examples:
43 This is a use example of how to use the building block from Python::
45 from biobb_dna.dna.dna_timeseries import dna_timeseries
47 prop = {
48 'helpar_name': 'twist',
49 'seqpos': [1,2,3,4,5],
50 'sequence': 'GCAACGTGCTATGGAAGC',
51 }
52 dna_timeseries(
53 input_ser_path='/path/to/twist.ser',
54 output_zip_path='/path/to/output/file.zip'
55 properties=prop)
56 Info:
57 * wrapped_software:
58 * name: In house
59 * license: Apache-2.0
60 * ontology:
61 * name: EDAM
62 * schema: http://edamontology.org/EDAM.owl
64 """
66 def __init__(
67 self, input_ser_path, output_zip_path, properties=None, **kwargs
68 ) -> None:
69 properties = properties or {}
71 # Call parent class constructor
72 super().__init__(properties)
73 self.locals_var_dict = locals().copy()
75 # Input/Output files
76 self.io_dict = {
77 "in": {
78 "input_ser_path": input_ser_path,
79 },
80 "out": {"output_zip_path": output_zip_path},
81 }
83 self.properties = properties
84 self.sequence = properties.get("sequence", None)
85 self.bins = properties.get("bins", "auto")
86 self.stride = properties.get("stride", 10)
87 self.seqpos = [
88 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None))
89 ]
90 self.helpar_name = properties.get("helpar_name", None)
92 # get helical parameter from filename if not specified
93 if self.helpar_name is None:
94 for hp in constants.helical_parameters:
95 if hp.lower() in Path(input_ser_path).name.lower():
96 self.helpar_name = hp
97 if self.helpar_name is None:
98 raise ValueError(
99 "Helical parameter name can't be inferred from file, "
100 "so it must be specified!"
101 )
102 else:
103 if self.helpar_name not in constants.helical_parameters:
104 raise ValueError(
105 "Helical parameter name is invalid! "
106 f"Options: {constants.helical_parameters}"
107 )
109 # get base length from helical parameter name
110 if self.helpar_name.lower() in constants.hp_singlebases:
111 self.baselen = 0
112 else:
113 self.baselen = 1
114 # get unit from helical parameter name
115 if self.helpar_name in constants.hp_angular:
116 self.hp_unit = "Degrees"
117 else:
118 self.hp_unit = "Angstroms"
120 # Check the properties
121 self.check_properties(properties)
122 self.check_arguments()
124 @launchlogger
125 def launch(self) -> int:
126 """Execute the :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` object."""
128 # Setup Biobb
129 if self.check_restart():
130 return 0
131 self.stage_files()
133 # check sequence
134 if self.sequence is None or len(self.sequence) < 2:
135 raise ValueError("sequence is null or too short!")
137 # calculate cols with 0 index
138 if self.seqpos:
139 cols = [i - 1 for i in self.seqpos]
140 else:
141 cols = list(range(len(self.sequence)))
143 # sort cols in ascending order
144 cols.sort()
146 # check seqpos for base pairs
147 if self.seqpos and self.helpar_name in constants.hp_basepairs:
148 if (max(cols) > len(self.sequence) - 2) or (min(cols) < 0):
149 raise ValueError(
150 f"seqpos values must be between 1 and {len(self.sequence) - 1}"
151 )
152 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
153 raise ValueError("seqpos must be a list of at least two integers")
154 # check seqpos for non base pairs
155 elif self.seqpos and self.helpar_name not in constants.hp_basepairs:
156 if (max(cols) > len(self.sequence) - 1) or (min(cols) < 0):
157 raise ValueError(
158 f"seqpos values must be between 1 and {len(self.sequence)}"
159 )
160 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
161 raise ValueError("seqpos must be a list of at least two integers")
163 if self.helpar_name in constants.hp_basepairs:
164 # remove first and last base pairs from cols if they match 0 and len(sequence)
165 if min(cols) == 0:
166 cols.pop(0)
167 if max(cols) == len(self.sequence) - 1:
168 cols.pop(-1)
170 # discard first and last base(pairs) from sequence
171 sequence = self.sequence[1:-1]
172 # create indices list
173 indices = cols.copy()
174 # create subunits list from cols
175 subunits = [f"{i+1}_{sequence[i-1:i+self.baselen]}" for i in cols]
176 # clean subunits (leave only basepairs)
177 pattern = re.compile(r"\d+_[A-Za-z]{2}")
178 # get removed items
179 removed_items = [s for s in subunits if not pattern.fullmatch(s)]
180 # get indices of removed items (in integer format and starting from 0)
181 removed_numbers = [
182 int(match.group())
183 for item in removed_items
184 if (match := re.match(r"\d+", item))
185 ]
186 removed_numbers = list(map(int, removed_numbers))
187 removed_numbers = [int(i) - 1 for i in removed_numbers]
188 # remove non basepairs from subunits and indices
189 subunits = [s for s in subunits if pattern.fullmatch(s)]
190 indices = [i for i in indices if i not in removed_numbers]
191 else:
192 sequence = self.sequence
193 # create indices list
194 indices = cols.copy()
195 # trick for getting the index column from the .ser file
196 indices.insert(0, 0)
197 # create subunits list from cols
198 subunits = [f"{i+1}_{sequence[i:i+1+self.baselen]}" for i in cols]
200 # read input .ser file
201 ser_data = read_series(
202 self.stage_io_dict["in"]["input_ser_path"], usecols=indices
203 )
205 # get columns for selected bases
206 ser_data.columns = subunits
208 # write output files for all selected bases (one per column)
209 zf = zipfile.ZipFile(Path(self.stage_io_dict["out"]["output_zip_path"]), "w")
210 for col in ser_data.columns:
211 # unstack columns to prevent errors from repeated base pairs
212 column_data = ser_data[[col]].unstack().dropna().reset_index(drop=True)
213 column_data.name = col
214 fu.log(f"Computing base number {col}...")
216 # column series
217 series_colfn = f"series_{self.helpar_name}_{col}"
218 column_data.to_csv(
219 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv"
220 )
221 # save table
222 zf.write(
223 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv",
224 arcname=f"{series_colfn}.csv",
225 )
227 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True)
228 reduced_data = column_data.iloc[:: self.stride]
229 axs.plot(reduced_data.index, reduced_data.to_numpy())
230 axs.set_xlabel("Time (Snapshots)")
231 axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})")
232 axs.set_title(
233 f"Helical Parameter vs Time: {self.helpar_name.capitalize()} "
234 "(base pair "
235 f"{'step' if self.baselen==1 else ''} {col})"
236 )
237 fig.savefig(
238 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg",
239 format="jpg",
240 )
241 # save plot
242 zf.write(
243 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg",
244 arcname=f"{series_colfn}.jpg",
245 )
246 plt.close()
248 # columns histogram
249 hist_colfn = f"hist_{self.helpar_name}_{col}"
250 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True)
251 ybins, x, _ = axs.hist(column_data, bins=self.bins)
252 pd.DataFrame({self.helpar_name: x[:-1], "density": ybins}).to_csv(
253 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv",
254 index=False,
255 )
256 # save table
257 zf.write(
258 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv",
259 arcname=f"{hist_colfn}.csv",
260 )
262 axs.set_ylabel("Density")
263 axs.set_xlabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})")
264 fig.savefig(
265 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg",
266 format="jpg",
267 )
268 # save plot
269 zf.write(
270 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg",
271 arcname=f"{hist_colfn}.jpg",
272 )
273 plt.close()
274 zf.close()
276 # Copy files to host
277 self.copy_to_host()
279 # Remove temporary file(s)
280 # self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")])
281 self.remove_tmp_files()
283 self.check_arguments(output_files_created=True, raise_exception=False)
285 return 0
288def dna_timeseries(
289 input_ser_path: str,
290 output_zip_path: str,
291 properties: Optional[dict] = None,
292 **kwargs,
293) -> int:
294 """Create :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` class and
295 execute the :meth:`launch() <dna.dna_timeseries.HelParTimeSeries.launch>` method."""
297 return HelParTimeSeries(
298 input_ser_path=input_ser_path,
299 output_zip_path=output_zip_path,
300 properties=properties,
301 **kwargs,
302 ).launch()
304 dna_timeseries.__doc__ = HelParTimeSeries.__doc__
307def main():
308 """Command line execution of this building block. Please check the command line documentation."""
309 parser = argparse.ArgumentParser(
310 description="Created time series and histogram plots for each base pair from a helical parameter series file.",
311 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999),
312 )
313 parser.add_argument("--config", required=False, help="Configuration file")
315 required_args = parser.add_argument_group("required arguments")
316 required_args.add_argument(
317 "--input_ser_path",
318 required=True,
319 help="Helical parameter input ser file path. Accepted formats: ser.",
320 )
321 required_args.add_argument(
322 "--output_zip_path", required=True, help="Path to output directory."
323 )
325 args = parser.parse_args()
326 args.config = args.config or "{}"
327 properties = settings.ConfReader(config=args.config).get_prop_dic()
329 dna_timeseries(
330 input_ser_path=args.input_ser_path,
331 output_zip_path=args.output_zip_path,
332 properties=properties,
333 )
336if __name__ == "__main__":
337 main()