Coverage for biobb_ml/dimensionality_reduction/pls_components.py: 72%

149 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:39 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the PLSComponents class and the command line interface.""" 

4import argparse 

5import warnings 

6import pandas as pd 

7import numpy as np 

8import matplotlib.pyplot as plt 

9from biobb_common.generic.biobb_object import BiobbObject 

10from scipy.signal import savgol_filter 

11from sys import stdout 

12from sklearn.cross_decomposition import PLSRegression 

13from sklearn.model_selection import cross_val_predict 

14from sklearn.metrics import mean_squared_error, r2_score 

15from biobb_common.configuration import settings 

16from biobb_common.tools import file_utils as fu 

17from biobb_common.tools.file_utils import launchlogger 

18from biobb_ml.dimensionality_reduction.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, getTarget, getTargetValue, getWindowLength 

19 

20 

21class PLSComponents(BiobbObject): 

22 """ 

23 | biobb_ml PLSComponents 

24 | Wrapper of the scikit-learn PLSRegression method. 

25 | Calculates best components number for a Partial Least Square (PLS) Regression. Visit the `PLSRegression documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.cross_decomposition.PLSRegression.html>`_ in the sklearn official website for further information. 

26 

27 Args: 

28 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/dimensionality_reduction/dataset_pls_components.csv>`_. Accepted formats: csv (edam:format_3752). 

29 output_results_path (str): Table with R2 and MSE for calibration and cross-validation data for the best number of components. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/dimensionality_reduction/ref_output_results_pls_components.csv>`_. Accepted formats: csv (edam:format_3752). 

30 output_plot_path (str) (Optional): Path to the Mean Square Error plot. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/dimensionality_reduction/ref_output_plot_pls_components.png>`_. Accepted formats: png (edam:format_3603). 

31 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

32 * **features** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked. 

33 * **target** (*dict*) - ({}) Dependent variable you want to predict from your dataset. You can specify either a column name or a column index. Formats: { "column": "column3" } or { "index": 21 }. In case of mulitple formats, the first one will be picked. 

34 * **optimise** (*boolean*) - (False) Whether or not optimise the process of MSE calculation. Beware, if True selected, the process can take a long time depending on the **max_components** value. 

35 * **max_components** (*int*) - (10) [1~1000|1] Maximum number of components to use by default for PLS queries. 

36 * **cv** (*int*) - (10) [1~10000|1] Specify the number of folds in the cross-validation splitting strategy. Value must be between 2 and number of samples in the dataset. 

37 * **scale** (*bool*) - (False) Whether or not to scale the input dataset. 

38 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

39 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

40 

41 Examples: 

42 This is a use example of how to use the building block from Python:: 

43 

44 from biobb_ml.dimensionality_reduction.pls_components import pls_components 

45 prop = { 

46 'features': { 

47 'columns': [ 'column1', 'column2', 'column3' ] 

48 }, 

49 'target': { 

50 'column': 'target' 

51 }, 

52 'max_components': 10, 

53 'cv': 10 

54 } 

55 pls_components(input_dataset_path='/path/to/myDataset.csv', 

56 output_results_path='/path/to/newTable.csv', 

57 output_plot_path='/path/to/newPlot.png', 

58 properties=prop) 

59 

60 Info: 

61 * wrapped_software: 

62 * name: scikit-learn PLSRegression 

63 * version: >=0.24.2 

64 * license: BSD 3-Clause 

65 * ontology: 

66 * name: EDAM 

67 * schema: http://edamontology.org/EDAM.owl 

68 

69 """ 

70 

71 def __init__(self, input_dataset_path, output_results_path, 

72 output_plot_path=None, properties=None, **kwargs) -> None: 

73 properties = properties or {} 

74 

75 # Call parent class constructor 

76 super().__init__(properties) 

77 self.locals_var_dict = locals().copy() 

78 

79 # Input/Output files 

80 self.io_dict = { 

81 "in": {"input_dataset_path": input_dataset_path}, 

82 "out": {"output_results_path": output_results_path, "output_plot_path": output_plot_path} 

83 } 

84 

85 # Properties specific for BB 

86 self.features = properties.get('features', []) 

87 self.target = properties.get('target', '') 

88 self.optimise = properties.get('optimise', False) 

89 self.max_components = properties.get('max_components', 10) 

90 self.cv = properties.get('cv', 10) 

91 self.scale = properties.get('scale', False) 

92 self.properties = properties 

93 

94 # Check the properties 

95 self.check_properties(properties) 

96 self.check_arguments() 

97 

98 def check_data_params(self, out_log, err_log): 

99 """ Checks all the input/output paths and parameters """ 

100 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

101 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__) 

102 if self.io_dict["out"]["output_plot_path"]: 

103 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__) 

104 

105 def warn(*args, **kwargs): 

106 pass 

107 

108 @launchlogger 

109 def launch(self) -> int: 

110 """Execute the :class:`PLSComponents <dimensionality_reduction.pls_components.PLSComponents>` dimensionality_reduction.pls_components.PLSComponents object.""" 

111 

112 # trick for disable warnings in interations 

113 warnings.warn = self.warn 

114 

115 # check input/output paths and parameters 

116 self.check_data_params(self.out_log, self.err_log) 

117 

118 # Setup Biobb 

119 if self.check_restart(): 

120 return 0 

121 self.stage_files() 

122 

123 # load dataset 

124 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

125 if 'columns' in self.features: 

126 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

127 skiprows = 1 

128 else: 

129 labels = None 

130 skiprows = None 

131 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

132 

133 # declare inputs, targets and weights 

134 # the inputs are all the features 

135 features = getIndependentVars(self.features, data, self.out_log, self.__class__.__name__) 

136 fu.log('Features: [%s]' % (getIndependentVarsList(self.features)), self.out_log, self.global_log) 

137 # target 

138 y = getTarget(self.target, data, self.out_log, self.__class__.__name__) 

139 fu.log('Target: %s' % (getTargetValue(self.target)), self.out_log, self.global_log) 

140 

141 if self.scale: 

142 fu.log('Scaling selected', self.out_log, self.global_log) 

143 

144 if self.optimise: 

145 

146 # get rid of baseline and linear variations calculating second derivative 

147 fu.log('Performing second derivative on the data', self.out_log, self.global_log) 

148 self.window_length = getWindowLength(17, features.shape[1]) 

149 X = savgol_filter(features, window_length=self.window_length, polyorder=2, deriv=2) 

150 

151 # run PLS from 1 to max_components 

152 fu.log('Calculating MSE for each %d components' % self.max_components, self.out_log, self.global_log) 

153 

154 mse = [] 

155 # Define MSE array to be populated 

156 msep = np.zeros((self.max_components, X.shape[1])) 

157 # Loop over the number of PLS components 

158 stdout.write("\r0% completed") 

159 for i in range(self.max_components): 

160 

161 # Regression with specified number of components, using full spectrum 

162 pls1 = PLSRegression(n_components=i+1, scale=self.scale) 

163 pls1.fit(X, y) 

164 

165 # Indices of sort spectra according to ascending absolute value of PLS coefficients 

166 sorted_ind = np.argsort(np.abs(pls1.coef_[:, 0])) 

167 # Sort spectra accordingly 

168 Xc = X[:, sorted_ind] 

169 # Discard one wavelength at a time of the sorted spectra, 

170 # regress, and calculate the MSE cross-validation 

171 for j in range(Xc.shape[1]-(i+1)): 

172 pls2 = PLSRegression(n_components=i+1) 

173 pls2.fit(Xc[:, j:], y) 

174 

175 y_cv = cross_val_predict(pls2, Xc[:, j:], y, cv=self.cv) 

176 msep[i, j] = mean_squared_error(y, y_cv) 

177 

178 # TO BE REVIEWED: 

179 # https://nirpyresearch.com/variable-selection-method-pls-python/ 

180 mx, my = np.where(msep == np.min(msep[np.nonzero(msep)])) 

181 mse.append(my[0]) 

182 

183 comp = 100*(i+1)/(self.max_components) 

184 if comp > 100: 

185 comp = 100 

186 stdout.write("\r%d%% completed" % comp) 

187 stdout.flush() 

188 print() 

189 

190 # Calculate the position of minimum in MSE 

191 mseminx, mseminy = np.where(msep == np.min(msep[np.nonzero(msep)])) 

192 best_c = mseminx[0] + 1 

193 

194 else: 

195 

196 # run PLS from 1 to max_components 

197 fu.log('Calculating MSE for each %d components' % self.max_components, self.out_log, self.global_log) 

198 

199 X = features 

200 

201 mse = [] 

202 stdout.write("\r0% completed") 

203 for i in np.arange(1, self.max_components + 1): 

204 pls = PLSRegression(n_components=i, scale=self.scale) 

205 # Cross-validation 

206 y_cv = cross_val_predict(pls, X, y, cv=self.cv) 

207 mse.append(mean_squared_error(y, y_cv)) 

208 # Trick to update status on the same line 

209 comp = 100*(i+1)/self.max_components 

210 if comp > 100: 

211 comp = 100 

212 stdout.write("\r%d%% completed" % comp) 

213 stdout.flush() 

214 print() 

215 # calculate the position of minimum in MSE 

216 best_c = np.argmin(mse) + 1 

217 

218 # mse table 

219 results_table = pd.DataFrame(data={'component': np.arange(1, self.max_components + 1), 'MSE': mse}) 

220 fu.log('Gathering results\n\nMSE TABLE\n\n%s\n' % results_table.to_string(index=False), self.out_log, self.global_log) 

221 

222 fu.log('Calculating scores and coefficients for best number of components = %d according to the MSE Method' % best_c, self.out_log, self.global_log) 

223 

224 # define PLS object with optimal number of components 

225 model = PLSRegression(n_components=best_c) 

226 # fit to the entire dataset 

227 model.fit(X, y) 

228 y_c = model.predict(X) 

229 # cross-validation 

230 y_cv = cross_val_predict(model, X, y, cv=self.cv) 

231 # calculate scores for calibration and cross-validation 

232 score_c = r2_score(y, y_c) 

233 score_cv = r2_score(y, y_cv) 

234 # calculate mean squared error for calibration and cross validation 

235 mse_c = mean_squared_error(y, y_c) 

236 mse_cv = mean_squared_error(y, y_cv) 

237 # create scores table 

238 r2_table = pd.DataFrame() 

239 r2_table["feature"] = ['R2 calib', 'R2 CV', 'MSE calib', 'MSE CV'] 

240 r2_table['coefficient'] = [score_c, score_cv, mse_c, mse_cv] 

241 

242 fu.log('Generating scores table\n\nR2 & MSE TABLE\n\n%s\n' % r2_table, self.out_log, self.global_log) 

243 

244 # save results table 

245 fu.log('Saving R2 & MSE table to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log) 

246 r2_table.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True, float_format='%.3f') 

247 

248 # mse plot 

249 if self.io_dict["out"]["output_plot_path"]: 

250 fu.log('Saving MSE plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

251 number_clusters = range(1, self.max_components + 1) 

252 plt.figure() 

253 plt.title('PLS', size=15) 

254 plt.plot(number_clusters, mse, '-o') 

255 plt.ylabel('MSE') 

256 plt.xlabel('Number of PLS Components') 

257 plt.axvline(x=best_c, c='red') 

258 plt.tight_layout() 

259 

260 plt.savefig(self.io_dict["out"]["output_plot_path"], dpi=150) 

261 

262 # Copy files to host 

263 self.copy_to_host() 

264 

265 self.tmp_files.extend([ 

266 self.stage_io_dict.get("unique_dir") 

267 ]) 

268 self.remove_tmp_files() 

269 

270 self.check_arguments(output_files_created=True, raise_exception=False) 

271 

272 return 0 

273 

274 

275def pls_components(input_dataset_path: str, output_results_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int: 

276 """Execute the :class:`PLSComponents <dimensionality_reduction.pls_components.PLSComponents>` class and 

277 execute the :meth:`launch() <dimensionality_reduction.pls_components.PLSComponents.launch>` method.""" 

278 

279 return PLSComponents(input_dataset_path=input_dataset_path, 

280 output_results_path=output_results_path, 

281 output_plot_path=output_plot_path, 

282 properties=properties, **kwargs).launch() 

283 

284 

285def main(): 

286 """Command line execution of this building block. Please check the command line documentation.""" 

287 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn PLSRegression method.", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

288 parser.add_argument('--config', required=False, help='Configuration file') 

289 

290 # Specific args of each building block 

291 required_args = parser.add_argument_group('required arguments') 

292 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

293 required_args.add_argument('--output_results_path', required=True, help='Table with R2 and MSE for calibration and cross-validation data for the best number of components. Accepted formats: csv.') 

294 parser.add_argument('--output_plot_path', required=False, help='Path to the Mean Square Error plot. Accepted formats: png.') 

295 

296 args = parser.parse_args() 

297 args.config = args.config or "{}" 

298 properties = settings.ConfReader(config=args.config).get_prop_dic() 

299 

300 # Specific call of each building block 

301 pls_components(input_dataset_path=args.input_dataset_path, 

302 output_results_path=args.output_results_path, 

303 output_plot_path=args.output_plot_path, 

304 properties=properties) 

305 

306 

307if __name__ == '__main__': 

308 main()