Coverage for biobb_ml/dimensionality_reduction/pls_components.py: 72%
148 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-03 14:57 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-03 14:57 +0000
1#!/usr/bin/env python3
3"""Module containing the PLSComponents class and the command line interface."""
4import argparse
5import warnings
6import pandas as pd
7import numpy as np
8import matplotlib.pyplot as plt
9from biobb_common.generic.biobb_object import BiobbObject
10from scipy.signal import savgol_filter
11from sys import stdout
12from sklearn.cross_decomposition import PLSRegression
13from sklearn.model_selection import cross_val_predict
14from sklearn.metrics import mean_squared_error, r2_score
15from biobb_common.configuration import settings
16from biobb_common.tools import file_utils as fu
17from biobb_common.tools.file_utils import launchlogger
18from biobb_ml.dimensionality_reduction.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, getTarget, getTargetValue, getWindowLength
21class PLSComponents(BiobbObject):
22 """
23 | biobb_ml PLSComponents
24 | Wrapper of the scikit-learn PLSRegression method.
25 | Calculates best components number for a Partial Least Square (PLS) Regression. Visit the `PLSRegression documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.cross_decomposition.PLSRegression.html>`_ in the sklearn official website for further information.
27 Args:
28 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/dimensionality_reduction/dataset_pls_components.csv>`_. Accepted formats: csv (edam:format_3752).
29 output_results_path (str): Table with R2 and MSE for calibration and cross-validation data for the best number of components. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/dimensionality_reduction/ref_output_results_pls_components.csv>`_. Accepted formats: csv (edam:format_3752).
30 output_plot_path (str) (Optional): Path to the Mean Square Error plot. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/dimensionality_reduction/ref_output_plot_pls_components.png>`_. Accepted formats: png (edam:format_3603).
31 properties (dic - Python dictionary object containing the tool parameters, not input/output files):
32 * **features** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked.
33 * **target** (*dict*) - ({}) Dependent variable you want to predict from your dataset. You can specify either a column name or a column index. Formats: { "column": "column3" } or { "index": 21 }. In case of mulitple formats, the first one will be picked.
34 * **optimise** (*boolean*) - (False) Whether or not optimise the process of MSE calculation. Beware, if True selected, the process can take a long time depending on the **max_components** value.
35 * **max_components** (*int*) - (10) [1~1000|1] Maximum number of components to use by default for PLS queries.
36 * **cv** (*int*) - (10) [1~10000|1] Specify the number of folds in the cross-validation splitting strategy. Value must be between 2 and number of samples in the dataset.
37 * **scale** (*bool*) - (False) Whether or not to scale the input dataset.
38 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
39 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
40 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
42 Examples:
43 This is a use example of how to use the building block from Python::
45 from biobb_ml.dimensionality_reduction.pls_components import pls_components
46 prop = {
47 'features': {
48 'columns': [ 'column1', 'column2', 'column3' ]
49 },
50 'target': {
51 'column': 'target'
52 },
53 'max_components': 10,
54 'cv': 10
55 }
56 pls_components(input_dataset_path='/path/to/myDataset.csv',
57 output_results_path='/path/to/newTable.csv',
58 output_plot_path='/path/to/newPlot.png',
59 properties=prop)
61 Info:
62 * wrapped_software:
63 * name: scikit-learn PLSRegression
64 * version: >=0.24.2
65 * license: BSD 3-Clause
66 * ontology:
67 * name: EDAM
68 * schema: http://edamontology.org/EDAM.owl
70 """
72 def __init__(self, input_dataset_path, output_results_path,
73 output_plot_path=None, properties=None, **kwargs) -> None:
74 properties = properties or {}
76 # Call parent class constructor
77 super().__init__(properties)
78 self.locals_var_dict = locals().copy()
80 # Input/Output files
81 self.io_dict = {
82 "in": {"input_dataset_path": input_dataset_path},
83 "out": {"output_results_path": output_results_path, "output_plot_path": output_plot_path}
84 }
86 # Properties specific for BB
87 self.features = properties.get('features', [])
88 self.target = properties.get('target', '')
89 self.optimise = properties.get('optimise', False)
90 self.max_components = properties.get('max_components', 10)
91 self.cv = properties.get('cv', 10)
92 self.scale = properties.get('scale', False)
93 self.properties = properties
95 # Check the properties
96 self.check_properties(properties)
97 self.check_arguments()
99 def check_data_params(self, out_log, err_log):
100 """ Checks all the input/output paths and parameters """
101 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__)
102 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__)
103 if self.io_dict["out"]["output_plot_path"]:
104 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__)
106 def warn(*args, **kwargs):
107 pass
109 @launchlogger
110 def launch(self) -> int:
111 """Execute the :class:`PLSComponents <dimensionality_reduction.pls_components.PLSComponents>` dimensionality_reduction.pls_components.PLSComponents object."""
113 # trick for disable warnings in interations
114 warnings.warn = self.warn
116 # check input/output paths and parameters
117 self.check_data_params(self.out_log, self.err_log)
119 # Setup Biobb
120 if self.check_restart():
121 return 0
122 self.stage_files()
124 # load dataset
125 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log)
126 if 'columns' in self.features:
127 labels = getHeader(self.io_dict["in"]["input_dataset_path"])
128 skiprows = 1
129 else:
130 labels = None
131 skiprows = None
132 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels)
134 # declare inputs, targets and weights
135 # the inputs are all the features
136 features = getIndependentVars(self.features, data, self.out_log, self.__class__.__name__)
137 fu.log('Features: [%s]' % (getIndependentVarsList(self.features)), self.out_log, self.global_log)
138 # target
139 y = getTarget(self.target, data, self.out_log, self.__class__.__name__)
140 fu.log('Target: %s' % (getTargetValue(self.target)), self.out_log, self.global_log)
142 if self.scale:
143 fu.log('Scaling selected', self.out_log, self.global_log)
145 if self.optimise:
147 # get rid of baseline and linear variations calculating second derivative
148 fu.log('Performing second derivative on the data', self.out_log, self.global_log)
149 self.window_length = getWindowLength(17, features.shape[1])
150 X = savgol_filter(features, window_length=self.window_length, polyorder=2, deriv=2)
152 # run PLS from 1 to max_components
153 fu.log('Calculating MSE for each %d components' % self.max_components, self.out_log, self.global_log)
155 mse = []
156 # Define MSE array to be populated
157 msep = np.zeros((self.max_components, X.shape[1]))
158 # Loop over the number of PLS components
159 stdout.write("\r0% completed")
160 for i in range(self.max_components):
162 # Regression with specified number of components, using full spectrum
163 pls1 = PLSRegression(n_components=i+1, scale=self.scale)
164 pls1.fit(X, y)
166 # Indices of sort spectra according to ascending absolute value of PLS coefficients
167 sorted_ind = np.argsort(np.abs(pls1.coef_[:, 0]))
168 # Sort spectra accordingly
169 Xc = X[:, sorted_ind]
170 # Discard one wavelength at a time of the sorted spectra,
171 # regress, and calculate the MSE cross-validation
172 for j in range(Xc.shape[1]-(i+1)):
173 pls2 = PLSRegression(n_components=i+1)
174 pls2.fit(Xc[:, j:], y)
176 y_cv = cross_val_predict(pls2, Xc[:, j:], y, cv=self.cv)
177 msep[i, j] = mean_squared_error(y, y_cv)
179 # TO BE REVIEWED:
180 # https://nirpyresearch.com/variable-selection-method-pls-python/
181 mx, my = np.where(msep == np.min(msep[np.nonzero(msep)]))
182 mse.append(my[0])
184 comp = 100*(i+1)/(self.max_components)
185 if comp > 100:
186 comp = 100
187 stdout.write("\r%d%% completed" % comp)
188 stdout.flush()
189 print()
191 # Calculate the position of minimum in MSE
192 mseminx, mseminy = np.where(msep == np.min(msep[np.nonzero(msep)]))
193 best_c = mseminx[0] + 1
195 else:
197 # run PLS from 1 to max_components
198 fu.log('Calculating MSE for each %d components' % self.max_components, self.out_log, self.global_log)
200 X = features
202 mse = []
203 stdout.write("\r0% completed")
204 for i in np.arange(1, self.max_components + 1):
205 pls = PLSRegression(n_components=i, scale=self.scale)
206 # Cross-validation
207 y_cv = cross_val_predict(pls, X, y, cv=self.cv)
208 mse.append(mean_squared_error(y, y_cv))
209 # Trick to update status on the same line
210 comp = 100*(i+1)/self.max_components
211 if comp > 100:
212 comp = 100
213 stdout.write("\r%d%% completed" % comp)
214 stdout.flush()
215 print()
216 # calculate the position of minimum in MSE
217 best_c = np.argmin(mse) + 1
219 # mse table
220 results_table = pd.DataFrame(data={'component': np.arange(1, self.max_components + 1), 'MSE': mse})
221 fu.log('Gathering results\n\nMSE TABLE\n\n%s\n' % results_table.to_string(index=False), self.out_log, self.global_log)
223 fu.log('Calculating scores and coefficients for best number of components = %d according to the MSE Method' % best_c, self.out_log, self.global_log)
225 # define PLS object with optimal number of components
226 model = PLSRegression(n_components=best_c)
227 # fit to the entire dataset
228 model.fit(X, y)
229 y_c = model.predict(X)
230 # cross-validation
231 y_cv = cross_val_predict(model, X, y, cv=self.cv)
232 # calculate scores for calibration and cross-validation
233 score_c = r2_score(y, y_c)
234 score_cv = r2_score(y, y_cv)
235 # calculate mean squared error for calibration and cross validation
236 mse_c = mean_squared_error(y, y_c)
237 mse_cv = mean_squared_error(y, y_cv)
238 # create scores table
239 r2_table = pd.DataFrame()
240 r2_table["feature"] = ['R2 calib', 'R2 CV', 'MSE calib', 'MSE CV']
241 r2_table['coefficient'] = [score_c, score_cv, mse_c, mse_cv]
243 fu.log('Generating scores table\n\nR2 & MSE TABLE\n\n%s\n' % r2_table, self.out_log, self.global_log)
245 # save results table
246 fu.log('Saving R2 & MSE table to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log)
247 r2_table.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True, float_format='%.3f')
249 # mse plot
250 if self.io_dict["out"]["output_plot_path"]:
251 fu.log('Saving MSE plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log)
252 number_clusters = range(1, self.max_components + 1)
253 plt.figure()
254 plt.title('PLS', size=15)
255 plt.plot(number_clusters, mse, '-o')
256 plt.ylabel('MSE')
257 plt.xlabel('Number of PLS Components')
258 plt.axvline(x=best_c, c='red')
259 plt.tight_layout()
261 plt.savefig(self.io_dict["out"]["output_plot_path"], dpi=150)
263 # Copy files to host
264 self.copy_to_host()
266 self.tmp_files.extend([
267 self.stage_io_dict.get("unique_dir")
268 ])
269 self.remove_tmp_files()
271 self.check_arguments(output_files_created=True, raise_exception=False)
273 return 0
276def pls_components(input_dataset_path: str, output_results_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int:
277 """Execute the :class:`PLSComponents <dimensionality_reduction.pls_components.PLSComponents>` class and
278 execute the :meth:`launch() <dimensionality_reduction.pls_components.PLSComponents.launch>` method."""
280 return PLSComponents(input_dataset_path=input_dataset_path,
281 output_results_path=output_results_path,
282 output_plot_path=output_plot_path,
283 properties=properties, **kwargs).launch()
286def main():
287 """Command line execution of this building block. Please check the command line documentation."""
288 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn PLSRegression method.", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999))
289 parser.add_argument('--config', required=False, help='Configuration file')
291 # Specific args of each building block
292 required_args = parser.add_argument_group('required arguments')
293 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.')
294 required_args.add_argument('--output_results_path', required=True, help='Table with R2 and MSE for calibration and cross-validation data for the best number of components. Accepted formats: csv.')
295 parser.add_argument('--output_plot_path', required=False, help='Path to the Mean Square Error plot. Accepted formats: png.')
297 args = parser.parse_args()
298 args.config = args.config or "{}"
299 properties = settings.ConfReader(config=args.config).get_prop_dic()
301 # Specific call of each building block
302 pls_components(input_dataset_path=args.input_dataset_path,
303 output_results_path=args.output_results_path,
304 output_plot_path=args.output_plot_path,
305 properties=properties)
308if __name__ == '__main__':
309 main()