Coverage for biobb_dna/dna/dna_timeseries.py: 76%

131 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 10:36 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the HelParTimeSeries class and the command line interface.""" 

4 

5import argparse 

6import re 

7import zipfile 

8from pathlib import Path 

9from typing import Optional 

10 

11import matplotlib.pyplot as plt 

12import pandas as pd 

13from biobb_common.configuration import settings 

14from biobb_common.generic.biobb_object import BiobbObject 

15from biobb_common.tools import file_utils as fu 

16from biobb_common.tools.file_utils import launchlogger 

17 

18from biobb_dna.utils import constants 

19from biobb_dna.utils.common import _from_string_to_list 

20from biobb_dna.utils.loader import read_series 

21 

22 

23class HelParTimeSeries(BiobbObject): 

24 """ 

25 | biobb_dna HelParTimeSeries 

26 | Created time series and histogram plots for each base pair from a helical parameter series file. 

27 | The helical parameter series file is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. 

28 

29 Args: 

30 input_ser_path (str): Path to .ser file for helical parameter. File is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/dna/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330). 

31 output_zip_path (str): Path to output .zip files where data is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/timeseries_output.zip>`_. Accepted formats: zip (edam:format_3987). 

32 properties (dict): 

33 * **sequence** (*str*) - (None) Nucleic acid sequence corresponding to the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *usecols* option). 

34 * **bins** (*int*) - (None) Bins for histogram. Parameter has same options as matplotlib.pyplot.hist. 

35 * **helpar_name** (*str*) - (None) Helical parameter name. It must match the name of the helical parameter in the .ser input file. Values: majd, majw, mind, minw, inclin, tip, xdisp, ydisp, shear, stretch, stagger, buckle, propel, opening, rise, roll, twist, shift, slide, tilt, alphaC, alphaW, betaC, betaW, gammaC, gammaW, deltaC, deltaW, epsilC, epsilW, zetaC, zetaW, chiC, chiW, phaseC, phaseW. 

36 * **stride** (*int*) - (1000) granularity of the number of snapshots for plotting time series. 

37 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 1) to analyze. If not specified it will analyse the complete sequence. 

38 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

39 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

40 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

41 

42 Examples: 

43 This is a use example of how to use the building block from Python:: 

44 

45 from biobb_dna.dna.dna_timeseries import dna_timeseries 

46 

47 prop = { 

48 'helpar_name': 'twist', 

49 'seqpos': [1,2,3,4,5], 

50 'sequence': 'GCAACGTGCTATGGAAGC', 

51 } 

52 dna_timeseries( 

53 input_ser_path='/path/to/twist.ser', 

54 output_zip_path='/path/to/output/file.zip' 

55 properties=prop) 

56 Info: 

57 * wrapped_software: 

58 * name: In house 

59 * license: Apache-2.0 

60 * ontology: 

61 * name: EDAM 

62 * schema: http://edamontology.org/EDAM.owl 

63 

64 """ 

65 

66 def __init__( 

67 self, input_ser_path, output_zip_path, properties=None, **kwargs 

68 ) -> None: 

69 properties = properties or {} 

70 

71 # Call parent class constructor 

72 super().__init__(properties) 

73 self.locals_var_dict = locals().copy() 

74 

75 # Input/Output files 

76 self.io_dict = { 

77 "in": { 

78 "input_ser_path": input_ser_path, 

79 }, 

80 "out": {"output_zip_path": output_zip_path}, 

81 } 

82 

83 self.properties = properties 

84 self.sequence = properties.get("sequence", None) 

85 self.bins = properties.get("bins", "auto") 

86 self.stride = properties.get("stride", 10) 

87 self.seqpos = [ 

88 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) 

89 ] 

90 self.helpar_name = properties.get("helpar_name", None) 

91 

92 # get helical parameter from filename if not specified 

93 if self.helpar_name is None: 

94 for hp in constants.helical_parameters: 

95 if hp.lower() in Path(input_ser_path).name.lower(): 

96 self.helpar_name = hp 

97 if self.helpar_name is None: 

98 raise ValueError( 

99 "Helical parameter name can't be inferred from file, " 

100 "so it must be specified!" 

101 ) 

102 else: 

103 if self.helpar_name not in constants.helical_parameters: 

104 raise ValueError( 

105 "Helical parameter name is invalid! " 

106 f"Options: {constants.helical_parameters}" 

107 ) 

108 

109 # get base length from helical parameter name 

110 if self.helpar_name.lower() in constants.hp_singlebases: 

111 self.baselen = 0 

112 else: 

113 self.baselen = 1 

114 # get unit from helical parameter name 

115 if self.helpar_name in constants.hp_angular: 

116 self.hp_unit = "Degrees" 

117 else: 

118 self.hp_unit = "Angstroms" 

119 

120 # Check the properties 

121 self.check_properties(properties) 

122 self.check_arguments() 

123 

124 @launchlogger 

125 def launch(self) -> int: 

126 """Execute the :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` object.""" 

127 

128 # Setup Biobb 

129 if self.check_restart(): 

130 return 0 

131 self.stage_files() 

132 

133 # check sequence 

134 if self.sequence is None or len(self.sequence) < 2: 

135 raise ValueError("sequence is null or too short!") 

136 

137 # calculate cols with 0 index 

138 if self.seqpos: 

139 cols = [i - 1 for i in self.seqpos] 

140 else: 

141 cols = list(range(len(self.sequence))) 

142 

143 # sort cols in ascending order 

144 cols.sort() 

145 

146 # check seqpos for base pairs 

147 if self.seqpos and self.helpar_name in constants.hp_basepairs: 

148 if (max(cols) > len(self.sequence) - 2) or (min(cols) < 0): 

149 raise ValueError( 

150 f"seqpos values must be between 1 and {len(self.sequence) - 1}" 

151 ) 

152 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

153 raise ValueError("seqpos must be a list of at least two integers") 

154 # check seqpos for non base pairs 

155 elif self.seqpos and self.helpar_name not in constants.hp_basepairs: 

156 if (max(cols) > len(self.sequence) - 1) or (min(cols) < 0): 

157 raise ValueError( 

158 f"seqpos values must be between 1 and {len(self.sequence)}" 

159 ) 

160 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

161 raise ValueError("seqpos must be a list of at least two integers") 

162 

163 if self.helpar_name in constants.hp_basepairs: 

164 # remove first and last base pairs from cols if they match 0 and len(sequence) 

165 if min(cols) == 0: 

166 cols.pop(0) 

167 if max(cols) == len(self.sequence) - 1: 

168 cols.pop(-1) 

169 

170 # discard first and last base(pairs) from sequence 

171 sequence = self.sequence[1:-1] 

172 # create indices list 

173 indices = cols.copy() 

174 # create subunits list from cols 

175 subunits = [f"{i+1}_{sequence[i-1:i+self.baselen]}" for i in cols] 

176 # clean subunits (leave only basepairs) 

177 pattern = re.compile(r"\d+_[A-Za-z]{2}") 

178 # get removed items 

179 removed_items = [s for s in subunits if not pattern.fullmatch(s)] 

180 # get indices of removed items (in integer format and starting from 0) 

181 removed_numbers = [ 

182 int(match.group()) 

183 for item in removed_items 

184 if (match := re.match(r"\d+", item)) 

185 ] 

186 removed_numbers = list(map(int, removed_numbers)) 

187 removed_numbers = [int(i) - 1 for i in removed_numbers] 

188 # remove non basepairs from subunits and indices 

189 subunits = [s for s in subunits if pattern.fullmatch(s)] 

190 indices = [i for i in indices if i not in removed_numbers] 

191 else: 

192 sequence = self.sequence 

193 # create indices list 

194 indices = cols.copy() 

195 # trick for getting the index column from the .ser file 

196 indices.insert(0, 0) 

197 # create subunits list from cols 

198 subunits = [f"{i+1}_{sequence[i:i+1+self.baselen]}" for i in cols] 

199 

200 # read input .ser file 

201 ser_data = read_series( 

202 self.stage_io_dict["in"]["input_ser_path"], usecols=indices 

203 ) 

204 

205 # get columns for selected bases 

206 ser_data.columns = subunits 

207 

208 # write output files for all selected bases (one per column) 

209 zf = zipfile.ZipFile(Path(self.stage_io_dict["out"]["output_zip_path"]), "w") 

210 for col in ser_data.columns: 

211 # unstack columns to prevent errors from repeated base pairs 

212 column_data = ser_data[[col]].unstack().dropna().reset_index(drop=True) 

213 column_data.name = col 

214 fu.log(f"Computing base number {col}...") 

215 

216 # column series 

217 series_colfn = f"series_{self.helpar_name}_{col}" 

218 column_data.to_csv( 

219 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv" 

220 ) 

221 # save table 

222 zf.write( 

223 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv", 

224 arcname=f"{series_colfn}.csv", 

225 ) 

226 

227 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

228 reduced_data = column_data.iloc[:: self.stride] 

229 axs.plot(reduced_data.index, reduced_data.to_numpy()) 

230 axs.set_xlabel("Time (Snapshots)") 

231 axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") 

232 axs.set_title( 

233 f"Helical Parameter vs Time: {self.helpar_name.capitalize()} " 

234 "(base pair " 

235 f"{'step' if self.baselen==1 else ''} {col})" 

236 ) 

237 fig.savefig( 

238 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg", 

239 format="jpg", 

240 ) 

241 # save plot 

242 zf.write( 

243 Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg", 

244 arcname=f"{series_colfn}.jpg", 

245 ) 

246 plt.close() 

247 

248 # columns histogram 

249 hist_colfn = f"hist_{self.helpar_name}_{col}" 

250 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

251 ybins, x, _ = axs.hist(column_data, bins=self.bins) 

252 pd.DataFrame({self.helpar_name: x[:-1], "density": ybins}).to_csv( 

253 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv", 

254 index=False, 

255 ) 

256 # save table 

257 zf.write( 

258 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv", 

259 arcname=f"{hist_colfn}.csv", 

260 ) 

261 

262 axs.set_ylabel("Density") 

263 axs.set_xlabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") 

264 fig.savefig( 

265 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg", 

266 format="jpg", 

267 ) 

268 # save plot 

269 zf.write( 

270 Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg", 

271 arcname=f"{hist_colfn}.jpg", 

272 ) 

273 plt.close() 

274 zf.close() 

275 

276 # Copy files to host 

277 self.copy_to_host() 

278 

279 # Remove temporary file(s) 

280 # self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) 

281 self.remove_tmp_files() 

282 

283 self.check_arguments(output_files_created=True, raise_exception=False) 

284 

285 return 0 

286 

287 

288def dna_timeseries( 

289 input_ser_path: str, 

290 output_zip_path: str, 

291 properties: Optional[dict] = None, 

292 **kwargs, 

293) -> int: 

294 """Create :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` class and 

295 execute the :meth:`launch() <dna.dna_timeseries.HelParTimeSeries.launch>` method.""" 

296 

297 return HelParTimeSeries( 

298 input_ser_path=input_ser_path, 

299 output_zip_path=output_zip_path, 

300 properties=properties, 

301 **kwargs, 

302 ).launch() 

303 

304 dna_timeseries.__doc__ = HelParTimeSeries.__doc__ 

305 

306 

307def main(): 

308 """Command line execution of this building block. Please check the command line documentation.""" 

309 parser = argparse.ArgumentParser( 

310 description="Created time series and histogram plots for each base pair from a helical parameter series file.", 

311 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), 

312 ) 

313 parser.add_argument("--config", required=False, help="Configuration file") 

314 

315 required_args = parser.add_argument_group("required arguments") 

316 required_args.add_argument( 

317 "--input_ser_path", 

318 required=True, 

319 help="Helical parameter input ser file path. Accepted formats: ser.", 

320 ) 

321 required_args.add_argument( 

322 "--output_zip_path", required=True, help="Path to output directory." 

323 ) 

324 

325 args = parser.parse_args() 

326 args.config = args.config or "{}" 

327 properties = settings.ConfReader(config=args.config).get_prop_dic() 

328 

329 dna_timeseries( 

330 input_ser_path=args.input_ser_path, 

331 output_zip_path=args.output_zip_path, 

332 properties=properties, 

333 ) 

334 

335 

336if __name__ == "__main__": 

337 main()