Coverage for biobb_dna/dna/dna_averages.py: 74%

97 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:06 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the HelParAverages class and the command line interface.""" 

4import shutil 

5import argparse 

6from pathlib import Path 

7 

8import matplotlib.pyplot as plt 

9import pandas as pd 

10from biobb_dna.utils import constants 

11from biobb_dna.utils.loader import read_series 

12from biobb_common.generic.biobb_object import BiobbObject 

13from biobb_common.tools.file_utils import launchlogger 

14from biobb_common.tools import file_utils as fu 

15from biobb_common.configuration import settings 

16 

17 

18class HelParAverages(BiobbObject): 

19 """ 

20 | biobb_dna HelParAverages 

21 | Load .ser file for a given helical parameter and read each column corresponding to a base calculating average over each one. 

22 

23 Args: 

24 input_ser_path (str): Path to .ser file for helical parameter. File is expected to be a table, with the first column being an index and the rest the helical parameter values for each base/basepair. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/dna/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330). 

25 output_csv_path (str): Path to .csv file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/shift_avg.csv>`_. Accepted formats: csv (edam:format_3752). 

26 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/dna/shift_avg.jpg>`_. Accepted formats: jpg (edam:format_3579). 

27 properties (dict): 

28 * **sequence** (*str*) - (None) Nucleic acid sequence corresponding to the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

29 * **helpar_name** (*str*) - (Optional) helical parameter name. 

30 * **stride** (*int*) - (1000) granularity of the number of snapshots for plotting time series. 

31 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

32 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

33 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

34 Examples: 

35 This is a use example of how to use the building block from Python:: 

36 

37 from biobb_dna.dna.dna_averages import dna_averages 

38 

39 prop = { 

40 'helpar_name': 'twist', 

41 'seqpos': [1,2], 

42 'sequence': 'GCAT' 

43 } 

44 dna_averages( 

45 input_ser_path='/path/to/twist.ser', 

46 output_csv_path='/path/to/table/output.csv', 

47 output_jpg_path='/path/to/table/output.jpg', 

48 properties=prop) 

49 

50 Info: 

51 * wrapped_software: 

52 * name: In house 

53 * license: Apache-2.0 

54 * ontology: 

55 * name: EDAM 

56 * schema: http://edamontology.org/EDAM.owl 

57 

58 """ 

59 

60 def __init__(self, input_ser_path, output_csv_path, output_jpg_path, 

61 properties=None, **kwargs) -> None: 

62 properties = properties or {} 

63 

64 # Call parent class constructor 

65 super().__init__(properties) 

66 self.locals_var_dict = locals().copy() 

67 

68 # Input/Output files 

69 self.io_dict = { 

70 'in': { 

71 'input_ser_path': input_ser_path, 

72 }, 

73 'out': { 

74 'output_csv_path': output_csv_path, 

75 'output_jpg_path': output_jpg_path 

76 } 

77 } 

78 

79 # Properties specific for BB 

80 self.properties = properties 

81 self.sequence = properties.get("sequence", None) 

82 self.stride = properties.get( 

83 "stride", 1000) 

84 self.seqpos = properties.get( 

85 "seqpos", None) 

86 self.helpar_name = properties.get( 

87 "helpar_name", None) 

88 

89 # Check the properties 

90 self.check_properties(properties) 

91 self.check_arguments() 

92 

93 @launchlogger 

94 def launch(self) -> int: 

95 """Execute the :class:`HelParAverages <dna.averages.HelParAverages>` object.""" 

96 

97 # Setup Biobb 

98 if self.check_restart(): 

99 return 0 

100 self.stage_files() 

101 

102 # check sequence 

103 if self.sequence is None or len(self.sequence) < 2: 

104 raise ValueError("sequence is null or too short!") 

105 

106 # get helical parameter from filename if not specified 

107 if self.helpar_name is None: 

108 for hp in constants.helical_parameters: 

109 ser_name = Path( 

110 self.io_dict['in']['input_ser_path']).name.lower() 

111 if hp.lower() in ser_name: 

112 self.helpar_name = hp 

113 if self.helpar_name is None: 

114 raise ValueError( 

115 "Helical parameter name can't be inferred from file, " 

116 "so it must be specified!") 

117 else: 

118 if self.helpar_name not in constants.helical_parameters: 

119 raise ValueError( 

120 "Helical parameter name is invalid! " 

121 f"Options: {constants.helical_parameters}") 

122 

123 # get base length and unit from helical parameter name 

124 if self.helpar_name.lower() in constants.hp_basepairs: 

125 self.baselen = 1 

126 elif self.helpar_name.lower() in constants.hp_singlebases: 

127 self.baselen = 0 

128 if self.helpar_name in constants.hp_angular: 

129 self.hp_unit = "Degrees" 

130 else: 

131 self.hp_unit = "Angstroms" 

132 

133 # check seqpos 

134 if self.seqpos is not None: 

135 if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1): 

136 raise ValueError( 

137 f"seqpos values must be between 1 and {len(self.sequence) - 2}") 

138 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

139 raise ValueError( 

140 "seqpos must be a list of at least two integers") 

141 

142 # Creating temporary folder 

143 self.tmp_folder = fu.create_unique_dir(prefix="averages_") 

144 fu.log('Creating %s temporary folder' % self.tmp_folder, self.out_log) 

145 # Copy input_file_path1 to temporary folder 

146 shutil.copy(self.io_dict['in']['input_ser_path'], self.tmp_folder) 

147 

148 # read input .ser file 

149 ser_data = read_series( 

150 self.io_dict['in']['input_ser_path'], 

151 usecols=self.seqpos) 

152 if self.seqpos is None: 

153 ser_data = ser_data[ser_data.columns[1:-1]] 

154 # discard first and last base(pairs) from sequence 

155 sequence = self.sequence[1:] 

156 xlabels = [ 

157 f"{sequence[i:i+1+self.baselen]}" 

158 for i in range(len(ser_data.columns) - self.baselen)] 

159 else: 

160 sequence = self.sequence 

161 xlabels = [ 

162 f"{sequence[i:i+1+self.baselen]}" 

163 for i in self.seqpos] 

164 

165 # rename duplicated subunits 

166 while any(ser_data.columns.duplicated()): 

167 ser_data.columns = [ 

168 name if not duplicated else name + "_dup" 

169 for duplicated, name 

170 in zip(ser_data.columns.duplicated(), ser_data.columns)] 

171 

172 # write output files for all selected bases 

173 means = ser_data.mean(axis=0).iloc[:len(xlabels)] 

174 stds = ser_data.std(axis=0).iloc[:len(xlabels)] 

175 

176 # save plot 

177 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

178 axs.errorbar( 

179 means.index, 

180 means.to_numpy(), 

181 yerr=stds.to_numpy(), 

182 marker="o", 

183 capsize=5) 

184 axs.set_xticks(means.index) 

185 axs.set_xticklabels(xlabels, rotation=90) 

186 axs.set_xlabel( 

187 "Sequence Base Pair " 

188 f"{'Step' if self.baselen==1 else ''}") 

189 axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") 

190 axs.set_title( 

191 "Base Pair " 

192 f"{'Step' if self.baselen==1 else ''} " 

193 f"Helical Parameter: {self.helpar_name.capitalize()}") 

194 fig.savefig( 

195 self.io_dict['out']['output_jpg_path'], 

196 format="jpg") 

197 

198 # save table 

199 dataset = pd.DataFrame({ 

200 f"Base Pair {'Step' if self.baselen==1 else ''}": xlabels, 

201 "mean": means.to_numpy(), 

202 "std": stds.to_numpy()}) 

203 dataset.to_csv( 

204 self.io_dict['out']['output_csv_path'], 

205 index=False) 

206 

207 plt.close() 

208 

209 # Remove temporary file(s) 

210 self.tmp_files.extend([ 

211 self.stage_io_dict.get("unique_dir"), 

212 self.tmp_folder 

213 ]) 

214 self.remove_tmp_files() 

215 

216 self.check_arguments(output_files_created=True, raise_exception=False) 

217 

218 return 0 

219 

220 

221def dna_averages( 

222 input_ser_path: str, output_csv_path: str, output_jpg_path: str, 

223 properties: dict = None, **kwargs) -> int: 

224 """Create :class:`HelParAverages <dna.dna_averages.HelParAverages>` class and 

225 execute the :meth:`launch() <dna.dna_averages.HelParAverages.launch>` method.""" 

226 

227 return HelParAverages(input_ser_path=input_ser_path, 

228 output_csv_path=output_csv_path, 

229 output_jpg_path=output_jpg_path, 

230 properties=properties, **kwargs).launch() 

231 

232 

233def main(): 

234 """Command line execution of this building block. Please check the command line documentation.""" 

235 parser = argparse.ArgumentParser(description='Load helical parameter file and calculate average values for each base pair.', 

236 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

237 parser.add_argument('--config', required=False, help='Configuration file') 

238 

239 required_args = parser.add_argument_group('required arguments') 

240 required_args.add_argument('--input_ser_path', required=True, 

241 help='Helical parameter input ser file path. Accepted formats: ser.') 

242 required_args.add_argument('--output_csv_path', required=True, 

243 help='Path to output csv file. Accepted formats: csv.') 

244 required_args.add_argument('--output_jpg_path', required=True, 

245 help='Path to output jpg file. Accepted formats: jpg.') 

246 

247 args = parser.parse_args() 

248 args.config = args.config or "{}" 

249 properties = settings.ConfReader(config=args.config).get_prop_dic() 

250 

251 dna_averages( 

252 input_ser_path=args.input_ser_path, 

253 output_csv_path=args.output_csv_path, 

254 output_jpg_path=args.output_jpg_path, 

255 properties=properties) 

256 

257 

258if __name__ == '__main__': 

259 main()