Coverage for biobb_dna/intrabp_correlations/intraseqcorr.py: 72%

96 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:06 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the IntraSequenceCorrelation class and the command line interface.""" 

4import argparse 

5from pathlib import Path 

6 

7import numpy as np 

8import matplotlib.pyplot as plt 

9 

10from biobb_common.generic.biobb_object import BiobbObject 

11from biobb_common.configuration import settings 

12from biobb_common.tools import file_utils as fu 

13from biobb_common.tools.file_utils import launchlogger 

14from biobb_dna.utils.loader import read_series 

15from biobb_dna.utils import constants 

16 

17 

18class IntraSequenceCorrelation(BiobbObject): 

19 """ 

20 | biobb_dna IntraSequenceCorrelation 

21 | Calculate correlation between all intra-base pairs of a single sequence and for a single helical parameter. 

22 

23 Args: 

24 input_ser_path (str): Path to .ser file with data for single helical parameter. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_buckle.ser>`_. Accepted formats: ser (edam:format_2330). 

25 output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/intra_seqcorr_buckle.csv>`_. Accepted formats: csv (edam:format_3752). 

26 output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/intra_seqcorr_buckle.jpg>`_. Accepted formats: jpg (edam:format_3579). 

27 properties (dict): 

28 * **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option). 

29 * **helpar_name** (*str*) - (None) helical parameter name to add to plot title. 

30 * **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence. 

31 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

32 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

33 

34 Examples: 

35 This is a use example of how to use the building block from Python:: 

36 

37 from biobb_dna.intrabp_correlations.intraseqcorr import intraseqcorr 

38 

39 intraseqcorr( 

40 input_ser_path='path/to/input/file.ser', 

41 output_csv_path='path/to/output/file.csv', 

42 output_jpg_path='path/to/output/plot.jpg', 

43 properties=prop) 

44 Info: 

45 * wrapped_software: 

46 * name: In house 

47 * license: Apache-2.0 

48 * ontology: 

49 * name: EDAM 

50 * schema: http://edamontology.org/EDAM.owl 

51 

52 """ 

53 

54 def __init__( 

55 self, input_ser_path, output_csv_path, 

56 output_jpg_path, properties=None, **kwargs) -> None: 

57 properties = properties or {} 

58 

59 # Call parent class constructor 

60 super().__init__(properties) 

61 self.locals_var_dict = locals().copy() 

62 

63 # Input/Output files 

64 self.io_dict = { 

65 'in': { 

66 'input_ser_path': input_ser_path 

67 }, 

68 'out': { 

69 'output_csv_path': output_csv_path, 

70 'output_jpg_path': output_jpg_path 

71 } 

72 } 

73 

74 self.properties = properties 

75 self.sequence = properties.get("sequence", None) 

76 self.seqpos = properties.get("seqpos", None) 

77 self.helpar_name = properties.get("helpar_name", None) 

78 

79 # Check the properties 

80 self.check_properties(properties) 

81 self.check_arguments() 

82 

83 @launchlogger 

84 def launch(self) -> int: 

85 """Execute the :class:`HelParCorrelation <intrabp_correlations.intraseqcorr.IntraSequenceCorrelation>` object.""" 

86 

87 # Setup Biobb 

88 if self.check_restart(): 

89 return 0 

90 self.stage_files() 

91 

92 # check sequence 

93 if self.sequence is None or len(self.sequence) < 2: 

94 raise ValueError("sequence is null or too short!") 

95 

96 # get helical parameter from filename if not specified 

97 if self.helpar_name is None: 

98 for hp in constants.helical_parameters: 

99 if hp.lower() in Path( 

100 self.io_dict['in']['input_ser_path']).name.lower(): 

101 self.helpar_name = hp 

102 if self.helpar_name is None: 

103 raise ValueError( 

104 "Helical parameter name can't be inferred from file, " 

105 "so it must be specified!") 

106 else: 

107 if self.helpar_name not in constants.helical_parameters: 

108 raise ValueError( 

109 "Helical parameter name is invalid! " 

110 f"Options: {constants.helical_parameters}") 

111 

112 # get base length and unit from helical parameter name 

113 if self.helpar_name in constants.hp_angular: 

114 self.method = "pearson" 

115 else: 

116 self.method = self.circular 

117 

118 # check seqpos 

119 if self.seqpos is not None: 

120 if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): 

121 raise ValueError( 

122 "seqpos must be a list of at least two integers") 

123 

124 # Creating temporary folder 

125 self.tmp_folder = fu.create_unique_dir(prefix="bpcorrelation_") 

126 fu.log('Creating %s temporary folder' % self.tmp_folder, self.out_log) 

127 

128 # read input .ser file 

129 ser_data = read_series( 

130 self.io_dict['in']['input_ser_path'], 

131 usecols=self.seqpos) 

132 if self.seqpos is None: 

133 ser_data = ser_data[ser_data.columns[1:-1]] 

134 # discard first and last base(pairs) from strands 

135 sequence = self.sequence[1:] 

136 labels = [ 

137 f"{i+1}_{sequence[i:i+1]}" for i in range(len(ser_data.columns))] 

138 else: 

139 labels = [f"{i+1}_{self.sequence[i:i+1]}" for i in self.seqpos] 

140 ser_data.columns = labels 

141 

142 # make matrix 

143 corr_data = ser_data.corr(method=self.method) 

144 

145 # save csv data 

146 corr_data.to_csv(self.io_dict["out"]["output_csv_path"]) 

147 

148 # create heatmap 

149 fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) 

150 axs.pcolor(corr_data) 

151 # Loop over data dimensions and create text annotations. 

152 for i in range(len(corr_data)): 

153 for j in range(len(corr_data)): 

154 axs.text( 

155 j+.5, 

156 i+.5, 

157 f"{corr_data[corr_data.columns[j]].iloc[i]:.2f}", 

158 ha="center", 

159 va="center", 

160 color="w") 

161 axs.set_xticks([i + 0.5 for i in range(len(corr_data))]) 

162 axs.set_xticklabels(labels, rotation=90) 

163 axs.set_yticks([i + 0.5 for i in range(len(corr_data))]) 

164 axs.set_yticklabels(labels) 

165 axs.set_title( 

166 "Base Pair Correlation " 

167 f"for Helical Parameter \'{self.helpar_name}\'") 

168 fig.tight_layout() 

169 fig.savefig( 

170 self.io_dict['out']['output_jpg_path'], 

171 format="jpg") 

172 plt.close() 

173 

174 # Remove temporary file(s) 

175 self.tmp_files.extend([ 

176 self.stage_io_dict.get("unique_dir"), 

177 self.tmp_folder 

178 ]) 

179 self.remove_tmp_files() 

180 

181 self.check_arguments(output_files_created=True, raise_exception=False) 

182 

183 return 0 

184 

185 @staticmethod 

186 def circular(x1, x2): 

187 x1 = x1 * np.pi / 180 

188 x2 = x2 * np.pi / 180 

189 diff_1 = np.sin(x1 - x1.mean()) 

190 diff_2 = np.sin(x2 - x2.mean()) 

191 num = (diff_1 * diff_2).sum() 

192 den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum()) 

193 return num / den 

194 

195 

196def intraseqcorr( 

197 input_ser_path: str, 

198 output_csv_path: str, output_jpg_path: str, 

199 properties: dict = None, **kwargs) -> int: 

200 """Create :class:`HelParCorrelation <intrabp_correlations.intraseqcorr.IntraSequenceCorrelation>` class and 

201 execute the :meth:`launch() <intrabp_correlations.intraseqcorr.IntraSequenceCorrelation.launch>` method.""" 

202 

203 return IntraSequenceCorrelation( 

204 input_ser_path=input_ser_path, 

205 output_csv_path=output_csv_path, 

206 output_jpg_path=output_jpg_path, 

207 properties=properties, **kwargs).launch() 

208 

209 

210def main(): 

211 """Command line execution of this building block. Please check the command line documentation.""" 

212 parser = argparse.ArgumentParser(description='Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.', 

213 formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

214 parser.add_argument('--config', required=False, help='Configuration file') 

215 

216 required_args = parser.add_argument_group('required arguments') 

217 required_args.add_argument('--input_ser_path', required=True, 

218 help='Path to ser file with inputs. Accepted formats: ser.') 

219 required_args.add_argument('--output_csv_path', required=True, 

220 help='Path to output file. Accepted formats: csv.') 

221 required_args.add_argument('--output_jpg_path', required=True, 

222 help='Path to output plot. Accepted formats: jpg.') 

223 

224 args = parser.parse_args() 

225 args.config = args.config or "{}" 

226 properties = settings.ConfReader(config=args.config).get_prop_dic() 

227 

228 intraseqcorr( 

229 input_ser_path=args.input_ser_path, 

230 output_csv_path=args.output_csv_path, 

231 output_jpg_path=args.output_jpg_path, 

232 properties=properties) 

233 

234 

235if __name__ == '__main__': 

236 main()