Coverage for biobb_dna/interbp_correlations/interseqcorr.py: 71%

96 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 10:36 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the InterSequenceCorrelation class and the command line interface.""" 

4 

5import argparse 

6from pathlib import Path 

7from typing import Optional 

8 

9import matplotlib.pyplot as plt 

10import numpy as np 

11from biobb_common.configuration import settings 

12from biobb_common.generic.biobb_object import BiobbObject 

13from biobb_common.tools.file_utils import launchlogger 

14 

15from biobb_dna.utils import constants 

16from biobb_dna.utils.common import _from_string_to_list 

17from biobb_dna.utils.loader import read_series 

18 

19 

20class InterSequenceCorrelation(BiobbObject): 

21 """ 

22 | biobb_dna InterSequenceCorrelation 

23 | Calculate correlation between all base pairs of a single sequence and for a single helical parameter. 

24 | Calculate correlation between all base pairs of a single sequence and for a single helical parameter. 

25 

26 Args: 

27 input_ser_path (str): Path to .ser file with data for single helical parameter. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_roll.ser>`_. Accepted formats: ser (edam:format_2330). 

28 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_seqcorr_roll.csv>`_. Accepted formats: csv (edam:format_3752). 

29 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_seqcorr_roll.jpg>`_. Accepted formats: jpg (edam:format_3579). 

30 properties (dict): 

31 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

32 * **helpar_name** (*str*) - (None) helical parameter name to add to plot title. 

33 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

34 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

35 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

36 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

37 

38 Examples: 

39 This is a use example of how to use the building block from Python:: 

40 

41 from biobb_dna.interbp_correlations.interseqcorr import interseqcorr 

42 

43 prop = { 

44 "helpar_name": "helpar", 

45 "sequence": "CGTAATCG" 

46 } 

47 interseqcorr( 

48 input_ser_path='path/to/input/file.ser', 

49 output_csv_path='path/to/output/file.csv', 

50 output_jpg_path='path/to/output/plot.jpg', 

51 properties=prop) 

52 Info: 

53 * wrapped_software: 

54 * name: In house 

55 * license: Apache-2.0 

56 * ontology: 

57 * name: EDAM 

58 * schema: http://edamontology.org/EDAM.owl 

59 

60 """ 

61 

62 def __init__( 

63 self, 

64 input_ser_path, 

65 output_csv_path, 

66 output_jpg_path, 

67 properties=None, 

68 **kwargs, 

69 ) -> None: 

70 properties = properties or {} 

71 

72 # Call parent class constructor 

73 super().__init__(properties) 

74 self.locals_var_dict = locals().copy() 

75 

76 # Input/Output files 

77 self.io_dict = { 

78 "in": {"input_ser_path": input_ser_path}, 

79 "out": { 

80 "output_csv_path": output_csv_path, 

81 "output_jpg_path": output_jpg_path, 

82 }, 

83 } 

84 

85 self.properties = properties 

86 self.sequence = properties.get("sequence", None) 

87 self.seqpos = [ 

88 int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) 

89 ] 

90 self.helpar_name = properties.get("helpar_name", None) 

91 

92 # Check the properties 

93 self.check_properties(properties) 

94 self.check_arguments() 

95 

96 @launchlogger 

97 def launch(self) -> int: 

98 """Execute the :class:`HelParCorrelation <interbp_correlations.interseqcorr.InterSequenceCorrelation>` object.""" 

99 

100 # Setup Biobb 

101 if self.check_restart(): 

102 return 0 

103 self.stage_files() 

104 

105 # check sequence 

106 if self.sequence is None or len(self.sequence) < 2: 

107 raise ValueError("sequence is null or too short!") 

108 

109 # get helical parameter from filename if not specified 

110 if self.helpar_name is None: 

111 for hp in constants.helical_parameters: 

112 if ( 

113 hp.lower() 

114 in Path(self.stage_io_dict["in"]["input_ser_path"]).name.lower() 

115 ): 

116 self.helpar_name = hp 

117 if self.helpar_name is None: 

118 raise ValueError( 

119 "Helical parameter name can't be inferred from file, " 

120 "so it must be specified!" 

121 ) 

122 else: 

123 if self.helpar_name not in constants.helical_parameters: 

124 raise ValueError( 

125 "Helical parameter name is invalid! " 

126 f"Options: {constants.helical_parameters}" 

127 ) 

128 

129 # get base length and unit from helical parameter name 

130 if self.helpar_name in constants.hp_angular: 

131 self.method = "pearson" 

132 else: 

133 self.method = self.circular # type: ignore 

134 

135 # check seqpos 

136 if self.seqpos: 

137 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

138 raise ValueError("seqpos must be a list of at least two integers") 

139 else: 

140 self.seqpos = None # type: ignore 

141 

142 # read input .ser file 

143 ser_data = read_series( 

144 self.stage_io_dict["in"]["input_ser_path"], usecols=self.seqpos 

145 ) 

146 if not self.seqpos: 

147 ser_data = ser_data[ser_data.columns[1:-1]] 

148 # discard first and last base(pairs) from strands 

149 sequence = self.sequence[1:] 

150 labels = [f"{i+1}_{sequence[i:i+2]}" for i in range(len(ser_data.columns))] 

151 else: 

152 labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos] 

153 ser_data.columns = labels 

154 

155 # make matrix 

156 corr_data = ser_data.corr(method=self.method) 

157 

158 # save csv data 

159 corr_data.to_csv(self.stage_io_dict["out"]["output_csv_path"]) 

160 

161 # create heatmap 

162 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

163 axs.pcolor(corr_data) 

164 # Loop over data dimensions and create text annotations. 

165 for i in range(len(corr_data)): 

166 for j in range(len(corr_data)): 

167 axs.text( 

168 j + 0.5, 

169 i + 0.5, 

170 f"{corr_data[corr_data.columns[j]].iloc[i]:.2f}", 

171 ha="center", 

172 va="center", 

173 color="w", 

174 ) 

175 axs.set_xticks([i + 0.5 for i in range(len(corr_data))]) 

176 axs.set_xticklabels(labels, rotation=90) 

177 axs.set_yticks([i + 0.5 for i in range(len(corr_data))]) 

178 axs.set_yticklabels(labels) 

179 axs.set_title( 

180 "Base Pair Correlation " f"for Helical Parameter '{self.helpar_name}'" 

181 ) 

182 fig.tight_layout() 

183 fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") 

184 plt.close() 

185 

186 # Copy files to host 

187 self.copy_to_host() 

188 

189 # Remove temporary file(s) 

190 # self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) 

191 self.remove_tmp_files() 

192 

193 self.check_arguments(output_files_created=True, raise_exception=False) 

194 

195 return 0 

196 

197 @staticmethod 

198 def circular(x1, x2): 

199 x1 = x1 * np.pi / 180 

200 x2 = x2 * np.pi / 180 

201 diff_1 = np.sin(x1 - x1.mean()) 

202 diff_2 = np.sin(x2 - x2.mean()) 

203 num = (diff_1 * diff_2).sum() 

204 den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum()) 

205 return num / den 

206 

207 

208def interseqcorr( 

209 input_ser_path: str, 

210 output_csv_path: str, 

211 output_jpg_path: str, 

212 properties: Optional[dict] = None, 

213 **kwargs, 

214) -> int: 

215 """Create :class:`HelParCorrelation <interbp_correlations.interseqcorr.InterSequenceCorrelation>` class and 

216 execute the :meth:`launch() <interbp_correlations.interseqcorr.InterSequenceCorrelation.launch>` method.""" 

217 

218 return InterSequenceCorrelation( 

219 input_ser_path=input_ser_path, 

220 output_csv_path=output_csv_path, 

221 output_jpg_path=output_jpg_path, 

222 properties=properties, 

223 **kwargs, 

224 ).launch() 

225 

226 interseqcorr.__doc__ = InterSequenceCorrelation.__doc__ 

227 

228 

229def main(): 

230 """Command line execution of this building block. Please check the command line documentation.""" 

231 parser = argparse.ArgumentParser( 

232 description="Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.", 

233 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), 

234 ) 

235 parser.add_argument("--config", required=False, help="Configuration file") 

236 

237 required_args = parser.add_argument_group("required arguments") 

238 required_args.add_argument( 

239 "--input_ser_path", 

240 required=True, 

241 help="Path to ser file with inputs. Accepted formats: ser.", 

242 ) 

243 required_args.add_argument( 

244 "--output_csv_path", 

245 required=True, 

246 help="Path to output file. Accepted formats: csv.", 

247 ) 

248 required_args.add_argument( 

249 "--output_jpg_path", 

250 required=True, 

251 help="Path to output plot. Accepted formats: jpg.", 

252 ) 

253 

254 args = parser.parse_args() 

255 args.config = args.config or "{}" 

256 properties = settings.ConfReader(config=args.config).get_prop_dic() 

257 

258 interseqcorr( 

259 input_ser_path=args.input_ser_path, 

260 output_csv_path=args.output_csv_path, 

261 output_jpg_path=args.output_jpg_path, 

262 properties=properties, 

263 ) 

264 

265 

266if __name__ == "__main__": 

267 main()