Coverage for biobb_dna / dna / dna_averages.py: 81%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-15 18:49 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the HelParAverages class and the command line interface.""" 

4 

5from pathlib import Path 

6from typing import Optional 

7 

8import matplotlib.pyplot as plt 

9import pandas as pd 

10from biobb_common.generic.biobb_object import BiobbObject 

11from biobb_common.tools.file_utils import launchlogger 

12 

13from biobb_dna.utils import constants 

14from biobb_dna.utils.common import _from_string_to_list 

15from biobb_dna.utils.loader import read_series 

16 

17 

18class HelParAverages(BiobbObject): 

19 """ 

20 | biobb_dna HelParAverages 

21 | Load .ser file for a given helical parameter and read each column corresponding to a base calculating average over each one. 

22 | Calculate average values for each base pair and save them in a .csv file. 

23 

24 Args: 

25 input_ser_path (str): Path to .ser file for helical parameter. File is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/dna/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330). 

26 output_csv_path (str): Path to .csv file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/shift_avg.csv>`_. Accepted formats: csv (edam:format_3752). 

27 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/shift_avg.jpg>`_. Accepted formats: jpg (edam:format_3579). 

28 properties (dict): 

29 * **sequence** (*str*) - (None) Nucleic acid sequence corresponding to the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

30 * **helpar_name** (*str*) - (Optional) helical parameter name. 

31 * **stride** (*int*) - (1000) granularity of the number of snapshots for plotting time series. 

32 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

33 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

34 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

35 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

36 Examples: 

37 This is a use example of how to use the building block from Python:: 

38 

39 from biobb_dna.dna.dna_averages import dna_averages 

40 

41 prop = { 

42 'helpar_name': 'twist', 

43 'seqpos': [1,2], 

44 'sequence': 'GCAT' 

45 } 

46 dna_averages( 

47 input_ser_path='/path/to/twist.ser', 

48 output_csv_path='/path/to/table/output.csv', 

49 output_jpg_path='/path/to/table/output.jpg', 

50 properties=prop) 

51 

52 Info: 

53 * wrapped_software: 

54 * name: In house 

55 * license: Apache-2.0 

56 * ontology: 

57 * name: EDAM 

58 * schema: http://edamontology.org/EDAM.owl 

59 

60 """ 

61 

62 def __init__( 

63 self, 

64 input_ser_path, 

65 output_csv_path, 

66 output_jpg_path, 

67 properties=None, 

68 **kwargs, 

69 ) -> None: 

70 properties = properties or {} 

71 

72 # Call parent class constructor 

73 super().__init__(properties) 

74 self.locals_var_dict = locals().copy() 

75 

76 # Input/Output files 

77 self.io_dict = { 

78 "in": { 

79 "input_ser_path": input_ser_path, 

80 }, 

81 "out": { 

82 "output_csv_path": output_csv_path, 

83 "output_jpg_path": output_jpg_path, 

84 }, 

85 } 

86 

87 # Properties specific for BB 

88 self.properties = properties 

89 self.sequence = properties.get("sequence", None) 

90 self.stride = properties.get("stride", 1000) 

91 self.seqpos = [ 

92 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) 

93 ] 

94 self.helpar_name = properties.get("helpar_name", None) 

95 

96 # Check the properties 

97 self.check_properties(properties) 

98 self.check_arguments() 

99 

100 @launchlogger 

101 def launch(self) -> int: 

102 """Execute the :class:`HelParAverages <dna.averages.HelParAverages>` object.""" 

103 

104 # Setup Biobb 

105 if self.check_restart(): 

106 return 0 

107 self.stage_files() 

108 

109 # check sequence 

110 if self.sequence is None or len(self.sequence) < 2: 

111 raise ValueError("sequence is null or too short!") 

112 

113 # get helical parameter from filename if not specified 

114 if self.helpar_name is None: 

115 for hp in constants.helical_parameters: 

116 ser_name = Path(self.stage_io_dict["in"]["input_ser_path"]).name.lower() 

117 if hp.lower() in ser_name: 

118 self.helpar_name = hp 

119 if self.helpar_name is None: 

120 raise ValueError( 

121 "Helical parameter name can't be inferred from file, " 

122 "so it must be specified!" 

123 ) 

124 else: 

125 if self.helpar_name not in constants.helical_parameters: 

126 raise ValueError( 

127 "Helical parameter name is invalid! " 

128 f"Options: {constants.helical_parameters}" 

129 ) 

130 

131 # get base length and unit from helical parameter name 

132 if self.helpar_name.lower() in constants.hp_basepairs: 

133 self.baselen = 1 

134 elif self.helpar_name.lower() in constants.hp_singlebases: 

135 self.baselen = 0 

136 if self.helpar_name in constants.hp_angular: 

137 self.hp_unit = "Degrees" 

138 else: 

139 self.hp_unit = "Angstroms" 

140 

141 # check seqpos 

142 if self.seqpos: 

143 if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1): 

144 raise ValueError( 

145 f"seqpos values must be between 1 and {len(self.sequence) - 2}" 

146 ) 

147 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

148 raise ValueError("seqpos must be a list of at least two integers") 

149 else: 

150 self.seqpos = None # type: ignore 

151 

152 # read input .ser file 

153 ser_data = read_series( 

154 self.stage_io_dict["in"]["input_ser_path"], usecols=self.seqpos 

155 ) 

156 if not self.seqpos: 

157 ser_data = ser_data[ser_data.columns[1:-1]] 

158 # discard first and last base(pairs) from sequence 

159 sequence = self.sequence[1:] 

160 xlabels = [ 

161 f"{sequence[i:i+1+self.baselen]}" 

162 for i in range(len(ser_data.columns) - self.baselen) 

163 ] 

164 else: 

165 sequence = self.sequence 

166 xlabels = [f"{sequence[i:i+1+self.baselen]}" for i in self.seqpos] 

167 

168 # rename duplicated subunits 

169 while any(pd.Index(ser_data.columns).duplicated()): 

170 ser_data.columns = [ 

171 name if not duplicated else name + "_dup" 

172 for duplicated, name in zip( 

173 pd.Index(ser_data.columns).duplicated(), ser_data.columns 

174 ) 

175 ] 

176 

177 # write output files for all selected bases 

178 means = ser_data.mean(axis=0).iloc[: len(xlabels)] 

179 stds = ser_data.std(axis=0).iloc[: len(xlabels)] 

180 

181 # save plot 

182 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

183 axs.errorbar( 

184 means.index, means.to_numpy(), yerr=stds.to_numpy(), marker="o", capsize=5 

185 ) 

186 axs.set_xticks(means.index) 

187 axs.set_xticklabels(xlabels, rotation=90) 

188 axs.set_xlabel("Sequence Base Pair " f"{'Step' if self.baselen == 1 else ''}") 

189 axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") 

190 axs.set_title( 

191 "Base Pair " 

192 f"{'Step' if self.baselen == 1 else ''} " 

193 f"Helical Parameter: {self.helpar_name.capitalize()}" 

194 ) 

195 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") 

196 

197 # save table 

198 dataset = pd.DataFrame( 

199 { 

200 f"Base Pair {'Step' if self.baselen == 1 else ''}": xlabels, 

201 "mean": means.to_numpy(), 

202 "std": stds.to_numpy(), 

203 } 

204 ) 

205 dataset.to_csv(self.stage_io_dict["out"]["output_csv_path"], index=False) 

206 

207 plt.close() 

208 

209 # Copy files to host 

210 self.copy_to_host() 

211 

212 # Remove temporary file(s) 

213 self.remove_tmp_files() 

214 

215 self.check_arguments(output_files_created=True, raise_exception=False) 

216 

217 return 0 

218 

219 

220def dna_averages( 

221 input_ser_path: str, 

222 output_csv_path: str, 

223 output_jpg_path: str, 

224 properties: Optional[dict] = None, 

225 **kwargs, 

226) -> int: 

227 """Create :class:`HelParAverages <dna.dna_averages.HelParAverages>` class and 

228 execute the :meth:`launch() <dna.dna_averages.HelParAverages.launch>` method.""" 

229 return HelParAverages(**dict(locals())).launch() 

230 

231 

232dna_averages.__doc__ = HelParAverages.__doc__ 

233main = HelParAverages.get_main(dna_averages, "Load helical parameter file and calculate average values for each base pair.") 

234 

235if __name__ == '__main__': 

236 main()