Coverage for biobb_dna / dna / dna_timeseries.py: 82%
120 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-15 18:49 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-15 18:49 +0000
1#!/usr/bin/env python3
3"""Module containing the HelParTimeSeries class and the command line interface."""
5import re
6import zipfile
7from pathlib import Path
8from typing import Optional
10import matplotlib.pyplot as plt
11import pandas as pd
12from biobb_common.generic.biobb_object import BiobbObject
13from biobb_common.tools import file_utils as fu
14from biobb_common.tools.file_utils import launchlogger
16from biobb_dna.utils import constants
17from biobb_dna.utils.common import _from_string_to_list
18from biobb_dna.utils.loader import read_series
21class HelParTimeSeries(BiobbObject):
22 """
23 | biobb_dna HelParTimeSeries
24 | Created time series and histogram plots for each base pair from a helical parameter series file.
25 | The helical parameter series file is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair.
27 Args:
28 input_ser_path (str): Path to .ser file for helical parameter. File is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/dna/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330).
29 output_zip_path (str): Path to output .zip files where data is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/timeseries_output.zip>`_. Accepted formats: zip (edam:format_3987).
30 properties (dict):
31 * **sequence** (*str*) - (None) Nucleic acid sequence corresponding to the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *usecols* option).
32 * **bins** (*int*) - (None) Bins for histogram. Parameter has same options as matplotlib.pyplot.hist.
33 * **helpar_name** (*str*) - (None) Helical parameter name. It must match the name of the helical parameter in the .ser input file. Values: majd, majw, mind, minw, inclin, tip, xdisp, ydisp, shear, stretch, stagger, buckle, propel, opening, rise, roll, twist, shift, slide, tilt, alphaC, alphaW, betaC, betaW, gammaC, gammaW, deltaC, deltaW, epsilC, epsilW, zetaC, zetaW, chiC, chiW, phaseC, phaseW.
34 * **stride** (*int*) - (1000) granularity of the number of snapshots for plotting time series.
35 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 1) to analyze. If not specified it will analyse the complete sequence.
36 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
37 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
38 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
40 Examples:
41 This is a use example of how to use the building block from Python::
43 from biobb_dna.dna.dna_timeseries import dna_timeseries
45 prop = {
46 'helpar_name': 'twist',
47 'seqpos': [1,2,3,4,5],
48 'sequence': 'GCAACGTGCTATGGAAGC',
49 }
50 dna_timeseries(
51 input_ser_path='/path/to/twist.ser',
52 output_zip_path='/path/to/output/file.zip'
53 properties=prop)
54 Info:
55 * wrapped_software:
56 * name: In house
57 * license: Apache-2.0
58 * ontology:
59 * name: EDAM
60 * schema: http://edamontology.org/EDAM.owl
62 """
64 def __init__(
65 self, input_ser_path, output_zip_path, properties=None, **kwargs
66 ) -> None:
67 properties = properties or {}
69 # Call parent class constructor
70 super().__init__(properties)
71 self.locals_var_dict = locals().copy()
73 # Input/Output files
74 self.io_dict = {
75 "in": {
76 "input_ser_path": input_ser_path,
77 },
78 "out": {"output_zip_path": output_zip_path},
79 }
81 self.properties = properties
82 self.sequence = properties.get("sequence", None)
83 self.bins = properties.get("bins", "auto")
84 self.stride = properties.get("stride", 10)
85 self.seqpos = [
86 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None))
87 ]
88 self.helpar_name = properties.get("helpar_name", None)
90 # get helical parameter from filename if not specified
91 if self.helpar_name is None:
92 for hp in constants.helical_parameters:
93 if hp.lower() in Path(input_ser_path).name.lower():
94 self.helpar_name = hp
95 if self.helpar_name is None:
96 raise ValueError(
97 "Helical parameter name can't be inferred from file, "
98 "so it must be specified!"
99 )
100 else:
101 if self.helpar_name not in constants.helical_parameters:
102 raise ValueError(
103 "Helical parameter name is invalid! "
104 f"Options: {constants.helical_parameters}"
105 )
107 # get base length from helical parameter name
108 if self.helpar_name.lower() in constants.hp_singlebases:
109 self.baselen = 0
110 else:
111 self.baselen = 1
112 # get unit from helical parameter name
113 if self.helpar_name in constants.hp_angular:
114 self.hp_unit = "Degrees"
115 else:
116 self.hp_unit = "Angstroms"
118 # Check the properties
119 self.check_properties(properties)
120 self.check_arguments()
122 @launchlogger
123 def launch(self) -> int:
124 """Execute the :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` object."""
126 # Setup Biobb
127 if self.check_restart():
128 return 0
129 self.stage_files()
131 # check sequence
132 if self.sequence is None or len(self.sequence) < 2:
133 raise ValueError("sequence is null or too short!")
135 # calculate cols with 0 index
136 if self.seqpos:
137 cols = [i - 1 for i in self.seqpos]
138 else:
139 cols = list(range(len(self.sequence)))
141 # sort cols in ascending order
142 cols.sort()
144 # check seqpos for base pairs
145 if self.seqpos and self.helpar_name in constants.hp_basepairs:
146 if (max(cols) > len(self.sequence) - 2) or (min(cols) < 0):
147 raise ValueError(
148 f"seqpos values must be between 1 and {len(self.sequence) - 1}"
149 )
150 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
151 raise ValueError("seqpos must be a list of at least two integers")
152 # check seqpos for non base pairs
153 elif self.seqpos and self.helpar_name not in constants.hp_basepairs:
154 if (max(cols) > len(self.sequence) - 1) or (min(cols) < 0):
155 raise ValueError(
156 f"seqpos values must be between 1 and {len(self.sequence)}"
157 )
158 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
159 raise ValueError("seqpos must be a list of at least two integers")
161 if self.helpar_name in constants.hp_basepairs:
162 # remove first and last base pairs from cols if they match 0 and len(sequence)
163 if min(cols) == 0:
164 cols.pop(0)
165 if max(cols) == len(self.sequence) - 1:
166 cols.pop(-1)
168 # discard first and last base(pairs) from sequence
169 sequence = self.sequence[1:-1]
170 # create indices list
171 indices = cols.copy()
172 # create subunits list from cols
173 subunits = [f"{i+1}_{sequence[i-1:i+self.baselen]}" for i in cols]
174 # clean subunits (leave only basepairs)
175 pattern = re.compile(r"\d+_[A-Za-z]{2}")
176 # get removed items
177 removed_items = [s for s in subunits if not pattern.fullmatch(s)]
178 # get indices of removed items (in integer format and starting from 0)
179 removed_numbers = [
180 int(match.group())
181 for item in removed_items
182 if (match := re.match(r"\d+", item))
183 ]
184 removed_numbers = list(map(int, removed_numbers))
185 removed_numbers = [int(i) - 1 for i in removed_numbers]
186 # remove non basepairs from subunits and indices
187 subunits = [s for s in subunits if pattern.fullmatch(s)]
188 indices = [i for i in indices if i not in removed_numbers]
189 else:
190 sequence = self.sequence
191 # create indices list
192 indices = cols.copy()
193 # trick for getting the index column from the .ser file
194 indices.insert(0, 0)
195 # create subunits list from cols
196 subunits = [f"{i+1}_{sequence[i:i+1+self.baselen]}" for i in cols]
198 # read input .ser file
199 ser_data = read_series(
200 self.stage_io_dict["in"]["input_ser_path"], usecols=indices
201 )
203 # get columns for selected bases
204 ser_data.columns = subunits
206 # write output files for all selected bases (one per column)
207 zf = zipfile.ZipFile(Path(self.stage_io_dict["out"]["output_zip_path"]), "w")
208 for col in ser_data.columns:
209 # unstack columns to prevent errors from repeated base pairs
210 column_data = ser_data[[col]].unstack().dropna().reset_index(drop=True)
211 column_data.name = col
212 fu.log(f"Computing base number {col}...")
214 # column series
215 series_colfn = f"series_{self.helpar_name}_{col}"
216 column_data.to_csv(
217 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv"
218 )
219 # save table
220 zf.write(
221 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv",
222 arcname=f"{series_colfn}.csv",
223 )
225 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True)
226 reduced_data = column_data.iloc[:: self.stride]
227 axs.plot(reduced_data.index, reduced_data.to_numpy())
228 axs.set_xlabel("Time (Snapshots)")
229 axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})")
230 axs.set_title(
231 f"Helical Parameter vs Time: {self.helpar_name.capitalize()} "
232 "(base pair "
233 f"{'step' if self.baselen == 1 else ''} {col})"
234 )
235 fig.savefig(
236 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg",
237 format="jpg",
238 )
239 # save plot
240 zf.write(
241 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg",
242 arcname=f"{series_colfn}.jpg",
243 )
244 plt.close()
246 # columns histogram
247 hist_colfn = f"hist_{self.helpar_name}_{col}"
248 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True)
249 ybins, x, _ = axs.hist(column_data, bins=self.bins)
250 pd.DataFrame({self.helpar_name: x[:-1], "density": ybins}).to_csv(
251 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv",
252 index=False,
253 )
254 # save table
255 zf.write(
256 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv",
257 arcname=f"{hist_colfn}.csv",
258 )
260 axs.set_ylabel("Density")
261 axs.set_xlabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})")
262 fig.savefig(
263 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg",
264 format="jpg",
265 )
266 # save plot
267 zf.write(
268 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg",
269 arcname=f"{hist_colfn}.jpg",
270 )
271 plt.close()
272 zf.close()
274 # Copy files to host
275 self.copy_to_host()
277 # Remove temporary file(s)
278 self.remove_tmp_files()
280 self.check_arguments(output_files_created=True, raise_exception=False)
282 return 0
285def dna_timeseries(
286 input_ser_path: str,
287 output_zip_path: str,
288 properties: Optional[dict] = None,
289 **kwargs,
290) -> int:
291 """Create :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` class and
292 execute the :meth:`launch() <dna.dna_timeseries.HelParTimeSeries.launch>` method."""
293 return HelParTimeSeries(**dict(locals())).launch()
296dna_timeseries.__doc__ = HelParTimeSeries.__doc__
297main = HelParTimeSeries.get_main(dna_timeseries, "Created time series and histogram plots for each base pair from a helical parameter series file.")
299if __name__ == '__main__':
300 main()