Coverage for biobb_dna/dna/dna_timeseries.py: 80%

110 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:06 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the HelParTimeSeries class and the command line interface.""" 

4import argparse 

5import shutil 

6import zipfile 

7from pathlib import Path 

8 

9import pandas as pd 

10import matplotlib.pyplot as plt 

11from biobb_dna.utils import constants 

12from biobb_dna.utils.loader import read_series 

13from biobb_common.generic.biobb_object import BiobbObject 

14from biobb_common.configuration import settings 

15from biobb_common.tools import file_utils as fu 

16from biobb_common.tools.file_utils import launchlogger 

17 

18 

19class HelParTimeSeries(BiobbObject): 

20 """ 

21 | biobb_dna HelParTimeSeries 

22 | Created time series and histogram plots for each base pair from a helical parameter series file. 

23 

24 Args: 

25 input_ser_path (str): Path to .ser file for helical parameter. File is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/dna/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330). 

26 output_zip_path (str): Path to output .zip files where data is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/timeseries_output.zip>`_. Accepted formats: zip (edam:format_3987). 

27 properties (dict): 

28 * **sequence** (*str*) - (None) Nucleic acid sequence corresponding to the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *usecols* option). 

29 * **bins** (*int*) - (None) Bins for histogram. Parameter has same options as matplotlib.pyplot.hist. 

30 * **helpar_name** (*str*) - (Optional) helical parameter name. 

31 * **stride** (*int*) - (1000) granularity of the number of snapshots for plotting time series. 

32 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

33 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

34 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

35 

36 Examples: 

37 This is a use example of how to use the building block from Python:: 

38 

39 from biobb_dna.dna.dna_timeseries import dna_timeseries 

40 

41 prop = { 

42 'helpar_name': 'twist', 

43 'seqpos': [1,2,3,4,5], 

44 'sequence': 'GCAACGTGCTATGGAAGC', 

45 } 

46 dna_timeseries( 

47 input_ser_path='/path/to/twist.ser', 

48 output_zip_path='/path/to/output/file.zip' 

49 properties=prop) 

50 Info: 

51 * wrapped_software: 

52 * name: In house 

53 * license: Apache-2.0 

54 * ontology: 

55 * name: EDAM 

56 * schema: http://edamontology.org/EDAM.owl 

57 

58 """ 

59 

60 def __init__(self, input_ser_path, output_zip_path, 

61 properties=None, **kwargs) -> None: 

62 properties = properties or {} 

63 

64 # Call parent class constructor 

65 super().__init__(properties) 

66 self.locals_var_dict = locals().copy() 

67 

68 # Input/Output files 

69 self.io_dict = { 

70 'in': { 

71 'input_ser_path': input_ser_path, 

72 }, 

73 'out': { 

74 'output_zip_path': output_zip_path 

75 } 

76 } 

77 

78 self.properties = properties 

79 self.sequence = properties.get("sequence", None) 

80 self.bins = properties.get("bins", "auto") 

81 self.stride = properties.get( 

82 "stride", 10) 

83 self.seqpos = properties.get( 

84 "seqpos", None) 

85 self.helpar_name = properties.get( 

86 "helpar_name", None) 

87 

88 # get helical parameter from filename if not specified 

89 if self.helpar_name is None: 

90 for hp in constants.helical_parameters: 

91 if hp.lower() in Path(input_ser_path).name.lower(): 

92 self.helpar_name = hp 

93 if self.helpar_name is None: 

94 raise ValueError( 

95 "Helical parameter name can't be inferred from file, " 

96 "so it must be specified!") 

97 else: 

98 if self.helpar_name not in constants.helical_parameters: 

99 raise ValueError( 

100 "Helical parameter name is invalid! " 

101 f"Options: {constants.helical_parameters}") 

102 

103 # get base length and unit from helical parameter name 

104 if self.helpar_name.lower() in constants.hp_singlebases: 

105 self.baselen = 0 

106 else: 

107 self.baselen = 1 

108 if self.helpar_name in constants.hp_angular: 

109 self.hp_unit = "Degrees" 

110 else: 

111 self.hp_unit = "Angstroms" 

112 

113 # Check the properties 

114 self.check_properties(properties) 

115 self.check_arguments() 

116 

117 @launchlogger 

118 def launch(self) -> int: 

119 """Execute the :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` object.""" 

120 

121 # Setup Biobb 

122 if self.check_restart(): 

123 return 0 

124 self.stage_files() 

125 

126 # check sequence 

127 if self.sequence is None or len(self.sequence) < 2: 

128 raise ValueError("sequence is null or too short!") 

129 

130 # check seqpos 

131 if self.seqpos is not None: 

132 if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1): 

133 raise ValueError( 

134 f"seqpos values must be between 1 and {len(self.sequence) - 2}") 

135 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

136 raise ValueError( 

137 "seqpos must be a list of at least two integers") 

138 

139 # Creating temporary folder 

140 self.tmp_folder = fu.create_unique_dir(prefix="timeseries_") 

141 fu.log('Creating %s temporary folder' % self.tmp_folder, self.out_log) 

142 

143 # Copy input_ser_path to temporary folder 

144 shutil.copy(self.io_dict['in']['input_ser_path'], self.tmp_folder) 

145 

146 # read input .ser file 

147 ser_data = read_series( 

148 self.io_dict['in']['input_ser_path'], 

149 usecols=self.seqpos) 

150 if self.seqpos is None: 

151 ser_data = ser_data[ser_data.columns[1:-1]] 

152 # discard first and last base(pairs) from sequence 

153 sequence = self.sequence[1:] 

154 subunits = [ 

155 f"{i+1}_{sequence[i:i+1+self.baselen]}" 

156 for i in range(len(ser_data.columns))] 

157 else: 

158 sequence = self.sequence 

159 subunits = [ 

160 f"{i+1}_{sequence[i:i+1+self.baselen]}" 

161 for i in self.seqpos] 

162 ser_data.columns = subunits 

163 

164 # write output files for all selected bases (one per column) 

165 zf = zipfile.ZipFile( 

166 Path(self.io_dict["out"]["output_zip_path"]), "w") 

167 for col in ser_data.columns: 

168 # unstack columns to prevent errors from repeated base pairs 

169 column_data = ( 

170 ser_data[[col]] 

171 .unstack() 

172 .dropna() 

173 .reset_index(drop=True)) 

174 column_data.name = col 

175 fu.log(f"Computing base number {col}...") 

176 

177 # column series 

178 series_colfn = f"series_{self.helpar_name}_{col}" 

179 column_data.to_csv( 

180 Path(self.tmp_folder) / f"{series_colfn}.csv") 

181 # save table 

182 zf.write( 

183 Path(self.tmp_folder) / f"{series_colfn}.csv", arcname=f"{series_colfn}.csv") 

184 

185 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

186 reduced_data = column_data.iloc[::self.stride] 

187 axs.plot(reduced_data.index, reduced_data.to_numpy()) 

188 axs.set_xlabel("Time (Snapshots)") 

189 axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") 

190 axs.set_title( 

191 f"Helical Parameter vs Time: {self.helpar_name.capitalize()} " 

192 "(base pair " 

193 f"{'step' if self.baselen==1 else ''} {col})") 

194 fig.savefig( 

195 Path(self.tmp_folder) / f"{series_colfn}.jpg", format="jpg") 

196 # save plot 

197 zf.write( 

198 Path(self.tmp_folder) / f"{series_colfn}.jpg", arcname=f"{series_colfn}.jpg") 

199 plt.close() 

200 

201 # columns histogram 

202 hist_colfn = f"hist_{self.helpar_name}_{col}" 

203 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

204 ybins, x, _ = axs.hist(column_data, bins=self.bins) 

205 pd.DataFrame({self.helpar_name: x[:-1], "density": ybins}).to_csv( 

206 Path(self.tmp_folder) / f"{hist_colfn}.csv", 

207 index=False) 

208 # save table 

209 zf.write( 

210 Path(self.tmp_folder) / f"{hist_colfn}.csv", 

211 arcname=f"{hist_colfn}.csv") 

212 

213 axs.set_ylabel("Density") 

214 axs.set_xlabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") 

215 fig.savefig( 

216 Path(self.tmp_folder) / f"{hist_colfn}.jpg", 

217 format="jpg") 

218 # save plot 

219 zf.write( 

220 Path(self.tmp_folder) / f"{hist_colfn}.jpg", 

221 arcname=f"{hist_colfn}.jpg") 

222 plt.close() 

223 zf.close() 

224 

225 # Remove temporary file(s) 

226 self.tmp_files.extend([ 

227 self.stage_io_dict.get("unique_dir"), 

228 self.tmp_folder 

229 ]) 

230 self.remove_tmp_files() 

231 

232 self.check_arguments(output_files_created=True, raise_exception=False) 

233 

234 return 0 

235 

236 

237def dna_timeseries( 

238 input_ser_path: str, output_zip_path: str, 

239 properties: dict = None, **kwargs) -> int: 

240 """Create :class:`HelParTimeSeries <dna.dna_timeseries.HelParTimeSeries>` class and 

241 execute the :meth:`launch() <dna.dna_timeseries.HelParTimeSeries.launch>` method.""" 

242 

243 return HelParTimeSeries( 

244 input_ser_path=input_ser_path, 

245 output_zip_path=output_zip_path, 

246 properties=properties, **kwargs).launch() 

247 

248 

249def main(): 

250 """Command line execution of this building block. Please check the command line documentation.""" 

251 parser = argparse.ArgumentParser(description='Created time series and histogram plots for each base pair from a helical parameter series file.', 

252 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

253 parser.add_argument('--config', required=False, 

254 help='Configuration file') 

255 

256 required_args = parser.add_argument_group('required arguments') 

257 required_args.add_argument('--input_ser_path', required=True, 

258 help='Helical parameter input ser file path. Accepted formats: ser.') 

259 required_args.add_argument('--output_zip_path', required=True, 

260 help='Path to output directory.') 

261 

262 args = parser.parse_args() 

263 args.config = args.config or "{}" 

264 properties = settings.ConfReader(config=args.config).get_prop_dic() 

265 

266 dna_timeseries( 

267 input_ser_path=args.input_ser_path, 

268 output_zip_path=args.output_zip_path, 

269 properties=properties) 

270 

271 

272if __name__ == '__main__': 

273 main()