Coverage for biobb_dna / dna / dna_timeseries.py: 82%

120 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-15 18:49 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the HelParTimeSeries class and the command line interface.""" 

4 

5import re 

6import zipfile 

7from pathlib import Path 

8from typing import Optional 

9 

10import matplotlib.pyplot as plt 

11import pandas as pd 

12from biobb_common.generic.biobb_object import BiobbObject 

13from biobb_common.tools import file_utils as fu 

14from biobb_common.tools.file_utils import launchlogger 

15 

16from biobb_dna.utils import constants 

17from biobb_dna.utils.common import _from_string_to_list 

18from biobb_dna.utils.loader import read_series 

19 

20 

21class HelParTimeSeries(BiobbObject): 

22 """ 

23 | biobb_dna HelParTimeSeries 

24 | Created time series and histogram plots for each base pair from a helical parameter series file. 

25 | The helical parameter series file is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. 

26 

27 Args: 

28 input_ser_path (str): Path to .ser file for helical parameter. File is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/dna/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330). 

29 output_zip_path (str): Path to output .zip files where data is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/timeseries_output.zip>`_. Accepted formats: zip (edam:format_3987). 

30 properties (dict): 

31 * **sequence** (*str*) - (None) Nucleic acid sequence corresponding to the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *usecols* option). 

32 * **bins** (*int*) - (None) Bins for histogram. Parameter has same options as matplotlib.pyplot.hist. 

33 * **helpar_name** (*str*) - (None) Helical parameter name. It must match the name of the helical parameter in the .ser input file. Values: majd, majw, mind, minw, inclin, tip, xdisp, ydisp, shear, stretch, stagger, buckle, propel, opening, rise, roll, twist, shift, slide, tilt, alphaC, alphaW, betaC, betaW, gammaC, gammaW, deltaC, deltaW, epsilC, epsilW, zetaC, zetaW, chiC, chiW, phaseC, phaseW. 

34 * **stride** (*int*) - (1000) granularity of the number of snapshots for plotting time series. 

35 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 1) to analyze. If not specified it will analyse the complete sequence. 

36 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

37 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

38 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

39 

40 Examples: 

41 This is a use example of how to use the building block from Python:: 

42 

43 from biobb_dna.dna.dna_timeseries import dna_timeseries 

44 

45 prop = { 

46 'helpar_name': 'twist', 

47 'seqpos': [1,2,3,4,5], 

48 'sequence': 'GCAACGTGCTATGGAAGC', 

49 } 

50 dna_timeseries( 

51 input_ser_path='/path/to/twist.ser', 

52 output_zip_path='/path/to/output/file.zip' 

53 properties=prop) 

54 Info: 

55 * wrapped_software: 

56 * name: In house 

57 * license: Apache-2.0 

58 * ontology: 

59 * name: EDAM 

60 * schema: http://edamontology.org/EDAM.owl 

61 

62 """ 

63 

64 def __init__( 

65 self, input_ser_path, output_zip_path, properties=None, **kwargs 

66 ) -> None: 

67 properties = properties or {} 

68 

69 # Call parent class constructor 

70 super().__init__(properties) 

71 self.locals_var_dict = locals().copy() 

72 

73 # Input/Output files 

74 self.io_dict = { 

75 "in": { 

76 "input_ser_path": input_ser_path, 

77 }, 

78 "out": {"output_zip_path": output_zip_path}, 

79 } 

80 

81 self.properties = properties 

82 self.sequence = properties.get("sequence", None) 

83 self.bins = properties.get("bins", "auto") 

84 self.stride = properties.get("stride", 10) 

85 self.seqpos = [ 

86 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) 

87 ] 

88 self.helpar_name = properties.get("helpar_name", None) 

89 

90 # get helical parameter from filename if not specified 

91 if self.helpar_name is None: 

92 for hp in constants.helical_parameters: 

93 if hp.lower() in Path(input_ser_path).name.lower(): 

94 self.helpar_name = hp 

95 if self.helpar_name is None: 

96 raise ValueError( 

97 "Helical parameter name can't be inferred from file, " 

98 "so it must be specified!" 

99 ) 

100 else: 

101 if self.helpar_name not in constants.helical_parameters: 

102 raise ValueError( 

103 "Helical parameter name is invalid! " 

104 f"Options: {constants.helical_parameters}" 

105 ) 

106 

107 # get base length from helical parameter name 

108 if self.helpar_name.lower() in constants.hp_singlebases: 

109 self.baselen = 0 

110 else: 

111 self.baselen = 1 

112 # get unit from helical parameter name 

113 if self.helpar_name in constants.hp_angular: 

114 self.hp_unit = "Degrees" 

115 else: 

116 self.hp_unit = "Angstroms" 

117 

118 # Check the properties 

119 self.check_properties(properties) 

120 self.check_arguments() 

121 

122 @launchlogger 

123 def launch(self) -> int: 

124 """Execute the :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` object.""" 

125 

126 # Setup Biobb 

127 if self.check_restart(): 

128 return 0 

129 self.stage_files() 

130 

131 # check sequence 

132 if self.sequence is None or len(self.sequence) < 2: 

133 raise ValueError("sequence is null or too short!") 

134 

135 # calculate cols with 0 index 

136 if self.seqpos: 

137 cols = [i - 1 for i in self.seqpos] 

138 else: 

139 cols = list(range(len(self.sequence))) 

140 

141 # sort cols in ascending order 

142 cols.sort() 

143 

144 # check seqpos for base pairs 

145 if self.seqpos and self.helpar_name in constants.hp_basepairs: 

146 if (max(cols) > len(self.sequence) - 2) or (min(cols) < 0): 

147 raise ValueError( 

148 f"seqpos values must be between 1 and {len(self.sequence) - 1}" 

149 ) 

150 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

151 raise ValueError("seqpos must be a list of at least two integers") 

152 # check seqpos for non base pairs 

153 elif self.seqpos and self.helpar_name not in constants.hp_basepairs: 

154 if (max(cols) > len(self.sequence) - 1) or (min(cols) < 0): 

155 raise ValueError( 

156 f"seqpos values must be between 1 and {len(self.sequence)}" 

157 ) 

158 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

159 raise ValueError("seqpos must be a list of at least two integers") 

160 

161 if self.helpar_name in constants.hp_basepairs: 

162 # remove first and last base pairs from cols if they match 0 and len(sequence) 

163 if min(cols) == 0: 

164 cols.pop(0) 

165 if max(cols) == len(self.sequence) - 1: 

166 cols.pop(-1) 

167 

168 # discard first and last base(pairs) from sequence 

169 sequence = self.sequence[1:-1] 

170 # create indices list 

171 indices = cols.copy() 

172 # create subunits list from cols 

173 subunits = [f"{i+1}_{sequence[i-1:i+self.baselen]}" for i in cols] 

174 # clean subunits (leave only basepairs) 

175 pattern = re.compile(r"\d+_[A-Za-z]{2}") 

176 # get removed items 

177 removed_items = [s for s in subunits if not pattern.fullmatch(s)] 

178 # get indices of removed items (in integer format and starting from 0) 

179 removed_numbers = [ 

180 int(match.group()) 

181 for item in removed_items 

182 if (match := re.match(r"\d+", item)) 

183 ] 

184 removed_numbers = list(map(int, removed_numbers)) 

185 removed_numbers = [int(i) - 1 for i in removed_numbers] 

186 # remove non basepairs from subunits and indices 

187 subunits = [s for s in subunits if pattern.fullmatch(s)] 

188 indices = [i for i in indices if i not in removed_numbers] 

189 else: 

190 sequence = self.sequence 

191 # create indices list 

192 indices = cols.copy() 

193 # trick for getting the index column from the .ser file 

194 indices.insert(0, 0) 

195 # create subunits list from cols 

196 subunits = [f"{i+1}_{sequence[i:i+1+self.baselen]}" for i in cols] 

197 

198 # read input .ser file 

199 ser_data = read_series( 

200 self.stage_io_dict["in"]["input_ser_path"], usecols=indices 

201 ) 

202 

203 # get columns for selected bases 

204 ser_data.columns = subunits 

205 

206 # write output files for all selected bases (one per column) 

207 zf = zipfile.ZipFile(Path(self.stage_io_dict["out"]["output_zip_path"]), "w") 

208 for col in ser_data.columns: 

209 # unstack columns to prevent errors from repeated base pairs 

210 column_data = ser_data[[col]].unstack().dropna().reset_index(drop=True) 

211 column_data.name = col 

212 fu.log(f"Computing base number {col}...") 

213 

214 # column series 

215 series_colfn = f"series_{self.helpar_name}_{col}" 

216 column_data.to_csv( 

217 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv" 

218 ) 

219 # save table 

220 zf.write( 

221 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv", 

222 arcname=f"{series_colfn}.csv", 

223 ) 

224 

225 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

226 reduced_data = column_data.iloc[:: self.stride] 

227 axs.plot(reduced_data.index, reduced_data.to_numpy()) 

228 axs.set_xlabel("Time (Snapshots)") 

229 axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") 

230 axs.set_title( 

231 f"Helical Parameter vs Time: {self.helpar_name.capitalize()} " 

232 "(base pair " 

233 f"{'step' if self.baselen == 1 else ''} {col})" 

234 ) 

235 fig.savefig( 

236 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg", 

237 format="jpg", 

238 ) 

239 # save plot 

240 zf.write( 

241 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg", 

242 arcname=f"{series_colfn}.jpg", 

243 ) 

244 plt.close() 

245 

246 # columns histogram 

247 hist_colfn = f"hist_{self.helpar_name}_{col}" 

248 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

249 ybins, x, _ = axs.hist(column_data, bins=self.bins) 

250 pd.DataFrame({self.helpar_name: x[:-1], "density": ybins}).to_csv( 

251 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv", 

252 index=False, 

253 ) 

254 # save table 

255 zf.write( 

256 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv", 

257 arcname=f"{hist_colfn}.csv", 

258 ) 

259 

260 axs.set_ylabel("Density") 

261 axs.set_xlabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") 

262 fig.savefig( 

263 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg", 

264 format="jpg", 

265 ) 

266 # save plot 

267 zf.write( 

268 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg", 

269 arcname=f"{hist_colfn}.jpg", 

270 ) 

271 plt.close() 

272 zf.close() 

273 

274 # Copy files to host 

275 self.copy_to_host() 

276 

277 # Remove temporary file(s) 

278 self.remove_tmp_files() 

279 

280 self.check_arguments(output_files_created=True, raise_exception=False) 

281 

282 return 0 

283 

284 

285def dna_timeseries( 

286 input_ser_path: str, 

287 output_zip_path: str, 

288 properties: Optional[dict] = None, 

289 **kwargs, 

290) -> int: 

291 """Create :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` class and 

292 execute the :meth:`launch() <dna.dna_timeseries.HelParTimeSeries.launch>` method.""" 

293 return HelParTimeSeries(**dict(locals())).launch() 

294 

295 

296dna_timeseries.__doc__ = HelParTimeSeries.__doc__ 

297main = HelParTimeSeries.get_main(dna_timeseries, "Created time series and histogram plots for each base pair from a helical parameter series file.") 

298 

299if __name__ == '__main__': 

300 main()