Coverage for biobb_ml/dimensionality_reduction/principal_component.py: 81%

101 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-03 14:57 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the PrincipalComponentAnalysis class and the command line interface.""" 

4import argparse 

5import pandas as pd 

6import numpy as np 

7import matplotlib.pyplot as plt 

8from biobb_common.generic.biobb_object import BiobbObject 

9from sklearn.preprocessing import StandardScaler 

10from sklearn.decomposition import PCA 

11from biobb_common.configuration import settings 

12from biobb_common.tools import file_utils as fu 

13from biobb_common.tools.file_utils import launchlogger 

14from biobb_ml.dimensionality_reduction.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, getTargetValue, generate_columns_labels, PCA2CPlot, PCA3CPlot 

15 

16 

17class PrincipalComponentAnalysis(BiobbObject): 

18 """ 

19 | biobb_ml PrincipalComponentAnalysis 

20 | Wrapper of the scikit-learn PCA method. 

21 | Analyses a given dataset through Principal Component Analysis (PCA). Visit the `PCA documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html>`_ in the sklearn official website for further information. 

22 

23 Args: 

24 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/dimensionality_reduction/dataset_principal_component.csv>`_. Accepted formats: csv (edam:format_3752). 

25 output_results_path (str): Path to the analysed dataset. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/dimensionality_reduction/ref_output_results_principal_component.csv>`_. Accepted formats: csv (edam:format_3752). 

26 output_plot_path (str) (Optional): Path to the Principal Component plot, only if number of components is 2 or 3. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/dimensionality_reduction/ref_output_plot_principal_component.png>`_. Accepted formats: png (edam:format_3603). 

27 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

28 * **features** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked. 

29 * **target** (*dict*) - ({}) Dependent variable you want to predict from your dataset. You can specify either a column name or a column index. Formats: { "column": "column3" } or { "index": 21 }. In case of mulitple formats, the first one will be picked. 

30 * **n_components** (*dict*) - ({}) Dictionary containing the number of components to keep (int) or the minimum number of principal components such the 0 to 1 range of the variance (float) is retained. If not set ({}) all components are kept. Formats for integer values: { "value": 2 } or for float values: { "value": 0.3 } 

31 * **random_state_method** (*int*) - (5) [1~1000|1] Controls the randomness of the estimator. 

32 * **scale** (*bool*) - (False) Whether or not to scale the input dataset. 

33 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

34 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

35 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

36 

37 Examples: 

38 This is a use example of how to use the building block from Python:: 

39 

40 from biobb_ml.dimensionality_reduction.principal_component import principal_component 

41 prop = { 

42 'features': { 

43 'columns': [ 'column1', 'column2', 'column3' ] 

44 }, 

45 'target': { 

46 'column': 'target' 

47 }, 

48 'n_components': { 

49 'int': 2 

50 } 

51 } 

52 principal_component(input_dataset_path='/path/to/myDataset.csv', 

53 output_results_path='/path/to/newTable.csv', 

54 output_plot_path='/path/to/newPlot.png', 

55 properties=prop) 

56 

57 Info: 

58 * wrapped_software: 

59 * name: scikit-learn PCA 

60 * version: >=0.24.2 

61 * license: BSD 3-Clause 

62 * ontology: 

63 * name: EDAM 

64 * schema: http://edamontology.org/EDAM.owl 

65 

66 """ 

67 

68 def __init__(self, input_dataset_path, output_results_path, 

69 output_plot_path=None, properties=None, **kwargs) -> None: 

70 properties = properties or {} 

71 

72 # Call parent class constructor 

73 super().__init__(properties) 

74 self.locals_var_dict = locals().copy() 

75 

76 # Input/Output files 

77 self.io_dict = { 

78 "in": {"input_dataset_path": input_dataset_path}, 

79 "out": {"output_results_path": output_results_path, "output_plot_path": output_plot_path} 

80 } 

81 

82 # Properties specific for BB 

83 self.features = properties.get('features', {}) 

84 self.target = properties.get('target', {}) 

85 self.n_components = properties.get('n_components', {}) 

86 self.random_state_method = properties.get('random_state_method', 5) 

87 self.scale = properties.get('scale', False) 

88 self.properties = properties 

89 

90 # Check the properties 

91 self.check_properties(properties) 

92 self.check_arguments() 

93 

94 def check_data_params(self, out_log, err_log): 

95 """ Checks all the input/output paths and parameters """ 

96 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

97 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__) 

98 if self.io_dict["out"]["output_plot_path"]: 

99 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__) 

100 

101 @launchlogger 

102 def launch(self) -> int: 

103 """Execute the :class:`PrincipalComponentAnalysis <dimensionality_reduction.principal_component.PrincipalComponentAnalysis>` dimensionality_reduction.pincipal_component.PrincipalComponentAnalysis object.""" 

104 

105 # check input/output paths and parameters 

106 self.check_data_params(self.out_log, self.err_log) 

107 

108 # Setup Biobb 

109 if self.check_restart(): 

110 return 0 

111 self.stage_files() 

112 

113 # load dataset 

114 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

115 if 'columns' in self.features: 

116 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

117 skiprows = 1 

118 else: 

119 labels = None 

120 skiprows = None 

121 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

122 

123 # declare inputs, targets and weights 

124 # the inputs are all the features 

125 features = getIndependentVars(self.features, data, self.out_log, self.__class__.__name__) 

126 fu.log('Features: [%s]' % (getIndependentVarsList(self.features)), self.out_log, self.global_log) 

127 # target 

128 y_value = getTargetValue(self.target) 

129 fu.log('Target: %s' % (y_value), self.out_log, self.global_log) 

130 

131 if self.scale: 

132 fu.log('Scaling dataset', self.out_log, self.global_log) 

133 scaler = StandardScaler() 

134 features = scaler.fit_transform(features) 

135 

136 # create a PCA object with self.n_components['value'] n_components 

137 if 'value' not in self.n_components: 

138 n_c = None 

139 else: 

140 n_c = self.n_components['value'] 

141 fu.log('Fitting dataset', self.out_log, self.global_log) 

142 model = PCA(n_components=n_c, random_state=self.random_state_method) 

143 # fit the data 

144 model.fit(features) 

145 

146 # calculate variance ratio 

147 v_ratio = model.explained_variance_ratio_ 

148 fu.log('Variance ratio for %d Principal Components: %s' % (v_ratio.shape[0], np.array2string(v_ratio, precision=3, separator=', ')), self.out_log, self.global_log) 

149 

150 # transform 

151 fu.log('Transforming dataset', self.out_log, self.global_log) 

152 pca = model.transform(features) 

153 pca = pd.DataFrame(data=pca, columns=generate_columns_labels('PC', v_ratio.shape[0])) 

154 

155 if 'columns' in self.features: 

156 d = data[[y_value]] 

157 target_plot = y_value 

158 else: 

159 d = data.loc[:, int(y_value)] 

160 target_plot = int(y_value) 

161 

162 # output results 

163 pca_table = pd.concat([pca, d], axis=1) 

164 fu.log('Calculating PCA for dataset\n\n%d COMPONENT PCA TABLE\n\n%s\n' % (v_ratio.shape[0], pca_table), self.out_log, self.global_log) 

165 

166 # save results 

167 fu.log('Saving data to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log) 

168 pca_table.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True) 

169 

170 # create output plot 

171 if (self.io_dict["out"]["output_plot_path"]): 

172 if v_ratio.shape[0] > 3: 

173 fu.log('%d PC\'s found. Displaying only 1st, 2nd and 3rd PC' % v_ratio.shape[0], self.out_log, self.global_log) 

174 fu.log('Saving PC plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

175 targets = np.unique(d) 

176 if v_ratio.shape[0] == 2: 

177 PCA2CPlot(pca_table, targets, target_plot) 

178 

179 if v_ratio.shape[0] >= 3: 

180 PCA3CPlot(pca_table, targets, target_plot) 

181 

182 plt.savefig(self.io_dict["out"]["output_plot_path"], dpi=150) 

183 

184 # Copy files to host 

185 self.copy_to_host() 

186 

187 self.tmp_files.extend([ 

188 self.stage_io_dict.get("unique_dir") 

189 ]) 

190 self.remove_tmp_files() 

191 

192 self.check_arguments(output_files_created=True, raise_exception=False) 

193 

194 return 0 

195 

196 

197def principal_component(input_dataset_path: str, output_results_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int: 

198 """Execute the :class:`PrincipalComponentAnalysis <dimensionality_reduction.principal_component.PrincipalComponentAnalysis>` class and 

199 execute the :meth:`launch() <dimensionality_reduction.principal_component.PrincipalComponentAnalysis.launch>` method.""" 

200 

201 return PrincipalComponentAnalysis(input_dataset_path=input_dataset_path, 

202 output_results_path=output_results_path, 

203 output_plot_path=output_plot_path, 

204 properties=properties, **kwargs).launch() 

205 

206 

207def main(): 

208 """Command line execution of this building block. Please check the command line documentation.""" 

209 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn PCA method.", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

210 parser.add_argument('--config', required=False, help='Configuration file') 

211 

212 # Specific args of each building block 

213 required_args = parser.add_argument_group('required arguments') 

214 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

215 required_args.add_argument('--output_results_path', required=True, help='Path to the analysed dataset. Accepted formats: csv.') 

216 parser.add_argument('--output_plot_path', required=False, help='Path to the Principal Component plot, only if number of components is 2 or 3. Accepted formats: png.') 

217 

218 args = parser.parse_args() 

219 args.config = args.config or "{}" 

220 properties = settings.ConfReader(config=args.config).get_prop_dic() 

221 

222 # Specific call of each building block 

223 principal_component(input_dataset_path=args.input_dataset_path, 

224 output_results_path=args.output_results_path, 

225 output_plot_path=args.output_plot_path, 

226 properties=properties) 

227 

228 

229if __name__ == '__main__': 

230 main()