Coverage for biobb_dna/dna/dna_averages.py: 72%

95 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 10:36 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the HelParAverages class and the command line interface.""" 

4 

5import argparse 

6from pathlib import Path 

7from typing import Optional 

8 

9import matplotlib.pyplot as plt 

10import pandas as pd 

11from biobb_common.configuration import settings 

12from biobb_common.generic.biobb_object import BiobbObject 

13from biobb_common.tools.file_utils import launchlogger 

14 

15from biobb_dna.utils import constants 

16from biobb_dna.utils.common import _from_string_to_list 

17from biobb_dna.utils.loader import read_series 

18 

19 

20class HelParAverages(BiobbObject): 

21 """ 

22 | biobb_dna HelParAverages 

23 | Load .ser file for a given helical parameter and read each column corresponding to a base calculating average over each one. 

24 | Calculate average values for each base pair and save them in a .csv file. 

25 

26 Args: 

27 input_ser_path (str): Path to .ser file for helical parameter. File is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/dna/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330). 

28 output_csv_path (str): Path to .csv file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/shift_avg.csv>`_. Accepted formats: csv (edam:format_3752). 

29 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/shift_avg.jpg>`_. Accepted formats: jpg (edam:format_3579). 

30 properties (dict): 

31 * **sequence** (*str*) - (None) Nucleic acid sequence corresponding to the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

32 * **helpar_name** (*str*) - (Optional) helical parameter name. 

33 * **stride** (*int*) - (1000) granularity of the number of snapshots for plotting time series. 

34 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

35 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

36 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

37 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

38 Examples: 

39 This is a use example of how to use the building block from Python:: 

40 

41 from biobb_dna.dna.dna_averages import dna_averages 

42 

43 prop = { 

44 'helpar_name': 'twist', 

45 'seqpos': [1,2], 

46 'sequence': 'GCAT' 

47 } 

48 dna_averages( 

49 input_ser_path='/path/to/twist.ser', 

50 output_csv_path='/path/to/table/output.csv', 

51 output_jpg_path='/path/to/table/output.jpg', 

52 properties=prop) 

53 

54 Info: 

55 * wrapped_software: 

56 * name: In house 

57 * license: Apache-2.0 

58 * ontology: 

59 * name: EDAM 

60 * schema: http://edamontology.org/EDAM.owl 

61 

62 """ 

63 

64 def __init__( 

65 self, 

66 input_ser_path, 

67 output_csv_path, 

68 output_jpg_path, 

69 properties=None, 

70 **kwargs, 

71 ) -> None: 

72 properties = properties or {} 

73 

74 # Call parent class constructor 

75 super().__init__(properties) 

76 self.locals_var_dict = locals().copy() 

77 

78 # Input/Output files 

79 self.io_dict = { 

80 "in": { 

81 "input_ser_path": input_ser_path, 

82 }, 

83 "out": { 

84 "output_csv_path": output_csv_path, 

85 "output_jpg_path": output_jpg_path, 

86 }, 

87 } 

88 

89 # Properties specific for BB 

90 self.properties = properties 

91 self.sequence = properties.get("sequence", None) 

92 self.stride = properties.get("stride", 1000) 

93 self.seqpos = [ 

94 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) 

95 ] 

96 self.helpar_name = properties.get("helpar_name", None) 

97 

98 # Check the properties 

99 self.check_properties(properties) 

100 self.check_arguments() 

101 

102 @launchlogger 

103 def launch(self) -> int: 

104 """Execute the :class:`HelParAverages <dna.averages.HelParAverages>` object.""" 

105 

106 # Setup Biobb 

107 if self.check_restart(): 

108 return 0 

109 self.stage_files() 

110 

111 # check sequence 

112 if self.sequence is None or len(self.sequence) < 2: 

113 raise ValueError("sequence is null or too short!") 

114 

115 # get helical parameter from filename if not specified 

116 if self.helpar_name is None: 

117 for hp in constants.helical_parameters: 

118 ser_name = Path(self.stage_io_dict["in"]["input_ser_path"]).name.lower() 

119 if hp.lower() in ser_name: 

120 self.helpar_name = hp 

121 if self.helpar_name is None: 

122 raise ValueError( 

123 "Helical parameter name can't be inferred from file, " 

124 "so it must be specified!" 

125 ) 

126 else: 

127 if self.helpar_name not in constants.helical_parameters: 

128 raise ValueError( 

129 "Helical parameter name is invalid! " 

130 f"Options: {constants.helical_parameters}" 

131 ) 

132 

133 # get base length and unit from helical parameter name 

134 if self.helpar_name.lower() in constants.hp_basepairs: 

135 self.baselen = 1 

136 elif self.helpar_name.lower() in constants.hp_singlebases: 

137 self.baselen = 0 

138 if self.helpar_name in constants.hp_angular: 

139 self.hp_unit = "Degrees" 

140 else: 

141 self.hp_unit = "Angstroms" 

142 

143 # check seqpos 

144 if self.seqpos: 

145 if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1): 

146 raise ValueError( 

147 f"seqpos values must be between 1 and {len(self.sequence) - 2}" 

148 ) 

149 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

150 raise ValueError("seqpos must be a list of at least two integers") 

151 else: 

152 self.seqpos = None # type: ignore 

153 

154 # read input .ser file 

155 ser_data = read_series( 

156 self.stage_io_dict["in"]["input_ser_path"], usecols=self.seqpos 

157 ) 

158 if not self.seqpos: 

159 ser_data = ser_data[ser_data.columns[1:-1]] 

160 # discard first and last base(pairs) from sequence 

161 sequence = self.sequence[1:] 

162 xlabels = [ 

163 f"{sequence[i:i+1+self.baselen]}" 

164 for i in range(len(ser_data.columns) - self.baselen) 

165 ] 

166 else: 

167 sequence = self.sequence 

168 xlabels = [f"{sequence[i:i+1+self.baselen]}" for i in self.seqpos] 

169 

170 # rename duplicated subunits 

171 while any(pd.Index(ser_data.columns).duplicated()): 

172 ser_data.columns = [ 

173 name if not duplicated else name + "_dup" 

174 for duplicated, name in zip( 

175 pd.Index(ser_data.columns).duplicated(), ser_data.columns 

176 ) 

177 ] 

178 

179 # write output files for all selected bases 

180 means = ser_data.mean(axis=0).iloc[: len(xlabels)] 

181 stds = ser_data.std(axis=0).iloc[: len(xlabels)] 

182 

183 # save plot 

184 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

185 axs.errorbar( 

186 means.index, means.to_numpy(), yerr=stds.to_numpy(), marker="o", capsize=5 

187 ) 

188 axs.set_xticks(means.index) 

189 axs.set_xticklabels(xlabels, rotation=90) 

190 axs.set_xlabel("Sequence Base Pair " f"{'Step' if self.baselen==1 else ''}") 

191 axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") 

192 axs.set_title( 

193 "Base Pair " 

194 f"{'Step' if self.baselen==1 else ''} " 

195 f"Helical Parameter: {self.helpar_name.capitalize()}" 

196 ) 

197 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") 

198 

199 # save table 

200 dataset = pd.DataFrame( 

201 { 

202 f"Base Pair {'Step' if self.baselen==1 else ''}": xlabels, 

203 "mean": means.to_numpy(), 

204 "std": stds.to_numpy(), 

205 } 

206 ) 

207 dataset.to_csv(self.stage_io_dict["out"]["output_csv_path"], index=False) 

208 

209 plt.close() 

210 

211 # Copy files to host 

212 self.copy_to_host() 

213 

214 # Remove temporary file(s) 

215 # self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) 

216 self.remove_tmp_files() 

217 

218 self.check_arguments(output_files_created=True, raise_exception=False) 

219 

220 return 0 

221 

222 

223def dna_averages( 

224 input_ser_path: str, 

225 output_csv_path: str, 

226 output_jpg_path: str, 

227 properties: Optional[dict] = None, 

228 **kwargs, 

229) -> int: 

230 """Create :class:`HelParAverages <dna.dna_averages.HelParAverages>` class and 

231 execute the :meth:`launch() <dna.dna_averages.HelParAverages.launch>` method.""" 

232 

233 return HelParAverages( 

234 input_ser_path=input_ser_path, 

235 output_csv_path=output_csv_path, 

236 output_jpg_path=output_jpg_path, 

237 properties=properties, 

238 **kwargs, 

239 ).launch() 

240 

241 dna_averages.__doc__ = HelParAverages.__doc__ 

242 

243 

244def main(): 

245 """Command line execution of this building block. Please check the command line documentation.""" 

246 parser = argparse.ArgumentParser( 

247 description="Load helical parameter file and calculate average values for each base pair.", 

248 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), 

249 ) 

250 parser.add_argument("--config", required=False, help="Configuration file") 

251 

252 required_args = parser.add_argument_group("required arguments") 

253 required_args.add_argument( 

254 "--input_ser_path", 

255 required=True, 

256 help="Helical parameter input ser file path. Accepted formats: ser.", 

257 ) 

258 required_args.add_argument( 

259 "--output_csv_path", 

260 required=True, 

261 help="Path to output csv file. Accepted formats: csv.", 

262 ) 

263 required_args.add_argument( 

264 "--output_jpg_path", 

265 required=True, 

266 help="Path to output jpg file. Accepted formats: jpg.", 

267 ) 

268 

269 args = parser.parse_args() 

270 args.config = args.config or "{}" 

271 properties = settings.ConfReader(config=args.config).get_prop_dic() 

272 

273 dna_averages( 

274 input_ser_path=args.input_ser_path, 

275 output_csv_path=args.output_csv_path, 

276 output_jpg_path=args.output_jpg_path, 

277 properties=properties, 

278 ) 

279 

280 

281if __name__ == "__main__": 

282 main()