Coverage for biobb_dna/interbp_correlations/interseqcorr.py: 72%

96 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:06 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the InterSequenceCorrelation class and the command line interface.""" 

4import argparse 

5from pathlib import Path 

6 

7import numpy as np 

8import matplotlib.pyplot as plt 

9 

10from biobb_common.generic.biobb_object import BiobbObject 

11from biobb_common.configuration import settings 

12from biobb_common.tools import file_utils as fu 

13from biobb_common.tools.file_utils import launchlogger 

14from biobb_dna.utils.loader import read_series 

15from biobb_dna.utils import constants 

16 

17 

18class InterSequenceCorrelation(BiobbObject): 

19 """ 

20 | biobb_dna InterSequenceCorrelation 

21 | Calculate correlation between all base pairs of a single sequence and for a single helical parameter. 

22 

23 Args: 

24 input_ser_path (str): Path to .ser file with data for single helical parameter. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_roll.ser>`_. Accepted formats: ser (edam:format_2330). 

25 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_seqcorr_roll.csv>`_. Accepted formats: csv (edam:format_3752). 

26 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_seqcorr_roll.jpg>`_. Accepted formats: jpg (edam:format_3579). 

27 properties (dict): 

28 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

29 * **helpar_name** (*str*) - (None) helical parameter name to add to plot title. 

30 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

31 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

32 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

33 

34 Examples: 

35 This is a use example of how to use the building block from Python:: 

36 

37 from biobb_dna.interbp_correlations.interseqcorr import interseqcorr 

38 

39 prop = { 

40 "helpar_name": "helpar", 

41 "sequence": "CGTAATCG" 

42 } 

43 interseqcorr( 

44 input_ser_path='path/to/input/file.ser', 

45 output_csv_path='path/to/output/file.csv', 

46 output_jpg_path='path/to/output/plot.jpg', 

47 properties=prop) 

48 Info: 

49 * wrapped_software: 

50 * name: In house 

51 * license: Apache-2.0 

52 * ontology: 

53 * name: EDAM 

54 * schema: http://edamontology.org/EDAM.owl 

55 

56 """ 

57 

58 def __init__( 

59 self, input_ser_path, output_csv_path, 

60 output_jpg_path, properties=None, **kwargs) -> None: 

61 properties = properties or {} 

62 

63 # Call parent class constructor 

64 super().__init__(properties) 

65 self.locals_var_dict = locals().copy() 

66 

67 # Input/Output files 

68 self.io_dict = { 

69 'in': { 

70 'input_ser_path': input_ser_path 

71 }, 

72 'out': { 

73 'output_csv_path': output_csv_path, 

74 'output_jpg_path': output_jpg_path 

75 } 

76 } 

77 

78 self.properties = properties 

79 self.sequence = properties.get("sequence", None) 

80 self.seqpos = properties.get("seqpos", None) 

81 self.helpar_name = properties.get("helpar_name", None) 

82 

83 # Check the properties 

84 self.check_properties(properties) 

85 self.check_arguments() 

86 

87 @launchlogger 

88 def launch(self) -> int: 

89 """Execute the :class:`HelParCorrelation <interbp_correlations.interseqcorr.InterSequenceCorrelation>` object.""" 

90 

91 # Setup Biobb 

92 if self.check_restart(): 

93 return 0 

94 self.stage_files() 

95 

96 # check sequence 

97 if self.sequence is None or len(self.sequence) < 2: 

98 raise ValueError("sequence is null or too short!") 

99 

100 # get helical parameter from filename if not specified 

101 if self.helpar_name is None: 

102 for hp in constants.helical_parameters: 

103 if hp.lower() in Path( 

104 self.io_dict['in']['input_ser_path']).name.lower(): 

105 self.helpar_name = hp 

106 if self.helpar_name is None: 

107 raise ValueError( 

108 "Helical parameter name can't be inferred from file, " 

109 "so it must be specified!") 

110 else: 

111 if self.helpar_name not in constants.helical_parameters: 

112 raise ValueError( 

113 "Helical parameter name is invalid! " 

114 f"Options: {constants.helical_parameters}") 

115 

116 # get base length and unit from helical parameter name 

117 if self.helpar_name in constants.hp_angular: 

118 self.method = "pearson" 

119 else: 

120 self.method = self.circular 

121 

122 # check seqpos 

123 if self.seqpos is not None: 

124 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

125 raise ValueError( 

126 "seqpos must be a list of at least two integers") 

127 

128 # Creating temporary folder 

129 self.tmp_folder = fu.create_unique_dir(prefix="bpcorrelation_") 

130 fu.log('Creating %s temporary folder' % self.tmp_folder, self.out_log) 

131 

132 # read input .ser file 

133 ser_data = read_series( 

134 self.io_dict['in']['input_ser_path'], 

135 usecols=self.seqpos) 

136 if self.seqpos is None: 

137 ser_data = ser_data[ser_data.columns[1:-1]] 

138 # discard first and last base(pairs) from strands 

139 sequence = self.sequence[1:] 

140 labels = [ 

141 f"{i+1}_{sequence[i:i+2]}" for i in range(len(ser_data.columns))] 

142 else: 

143 labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos] 

144 ser_data.columns = labels 

145 

146 # make matrix 

147 corr_data = ser_data.corr(method=self.method) 

148 

149 # save csv data 

150 corr_data.to_csv(self.io_dict["out"]["output_csv_path"]) 

151 

152 # create heatmap 

153 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

154 axs.pcolor(corr_data) 

155 # Loop over data dimensions and create text annotations. 

156 for i in range(len(corr_data)): 

157 for j in range(len(corr_data)): 

158 axs.text( 

159 j+.5, 

160 i+.5, 

161 f"{corr_data[corr_data.columns[j]].iloc[i]:.2f}", 

162 ha="center", 

163 va="center", 

164 color="w") 

165 axs.set_xticks([i + 0.5 for i in range(len(corr_data))]) 

166 axs.set_xticklabels(labels, rotation=90) 

167 axs.set_yticks([i + 0.5 for i in range(len(corr_data))]) 

168 axs.set_yticklabels(labels) 

169 axs.set_title( 

170 "Base Pair Correlation " 

171 f"for Helical Parameter \'{self.helpar_name}\'") 

172 fig.tight_layout() 

173 fig.savefig( 

174 self.io_dict['out']['output_jpg_path'], 

175 format="jpg") 

176 plt.close() 

177 

178 # Remove temporary file(s) 

179 self.tmp_files.extend([ 

180 self.stage_io_dict.get("unique_dir"), 

181 self.tmp_folder 

182 ]) 

183 self.remove_tmp_files() 

184 

185 self.check_arguments(output_files_created=True, raise_exception=False) 

186 

187 return 0 

188 

189 @staticmethod 

190 def circular(x1, x2): 

191 x1 = x1 * np.pi / 180 

192 x2 = x2 * np.pi / 180 

193 diff_1 = np.sin(x1 - x1.mean()) 

194 diff_2 = np.sin(x2 - x2.mean()) 

195 num = (diff_1 * diff_2).sum() 

196 den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum()) 

197 return num / den 

198 

199 

200def interseqcorr( 

201 input_ser_path: str, 

202 output_csv_path: str, output_jpg_path: str, 

203 properties: dict = None, **kwargs) -> int: 

204 """Create :class:`HelParCorrelation <interbp_correlations.interseqcorr.InterSequenceCorrelation>` class and 

205 execute the :meth:`launch() <interbp_correlations.interseqcorr.InterSequenceCorrelation.launch>` method.""" 

206 

207 return InterSequenceCorrelation( 

208 input_ser_path=input_ser_path, 

209 output_csv_path=output_csv_path, 

210 output_jpg_path=output_jpg_path, 

211 properties=properties, **kwargs).launch() 

212 

213 

214def main(): 

215 """Command line execution of this building block. Please check the command line documentation.""" 

216 parser = argparse.ArgumentParser(description='Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.', 

217 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

218 parser.add_argument('--config', required=False, help='Configuration file') 

219 

220 required_args = parser.add_argument_group('required arguments') 

221 required_args.add_argument('--input_ser_path', required=True, 

222 help='Path to ser file with inputs. Accepted formats: ser.') 

223 required_args.add_argument('--output_csv_path', required=True, 

224 help='Path to output file. Accepted formats: csv.') 

225 required_args.add_argument('--output_jpg_path', required=True, 

226 help='Path to output plot. Accepted formats: jpg.') 

227 

228 args = parser.parse_args() 

229 args.config = args.config or "{}" 

230 properties = settings.ConfReader(config=args.config).get_prop_dic() 

231 

232 interseqcorr( 

233 input_ser_path=args.input_ser_path, 

234 output_csv_path=args.output_csv_path, 

235 output_jpg_path=args.output_jpg_path, 

236 properties=properties) 

237 

238 

239if __name__ == '__main__': 

240 main()