Coverage for biobb_ml/classification/k_neighbors.py: 83%

150 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-03 14:57 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the KNeighborsTrain class and the command line interface.""" 

4import argparse 

5import pandas as pd 

6import numpy as np 

7import joblib 

8from biobb_common.generic.biobb_object import BiobbObject 

9from sklearn.preprocessing import StandardScaler 

10from sklearn.model_selection import train_test_split 

11from sklearn.metrics import confusion_matrix, classification_report, log_loss 

12from sklearn.neighbors import KNeighborsClassifier 

13from biobb_common.configuration import settings 

14from biobb_common.tools import file_utils as fu 

15from biobb_common.tools.file_utils import launchlogger 

16from biobb_ml.classification.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, getTarget, getTargetValue, getWeight, plotMultipleCM, plotBinaryClassifier 

17 

18 

19class KNeighborsTrain(BiobbObject): 

20 """ 

21 | biobb_ml KNeighborsTrain 

22 | Wrapper of the scikit-learn KNeighborsClassifier method. 

23 | Trains and tests a given dataset and saves the model and scaler. Visit the `KNeighborsClassifier documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.KNeighborsClassifier.html>`_ in the sklearn official website for further information. 

24 

25 Args: 

26 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/classification/dataset_k_neighbors.csv>`_. Accepted formats: csv (edam:format_3752). 

27 output_model_path (str): Path to the output model file. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/classification/ref_output_model_k_neighbors.pkl>`_. Accepted formats: pkl (edam:format_3653). 

28 output_test_table_path (str) (Optional): Path to the test table file. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/classification/ref_output_test_k_neighbors.csv>`_. Accepted formats: csv (edam:format_3752). 

29 output_plot_path (str) (Optional): Path to the statistics plot. If target is binary it shows confusion matrix, distributions of the predicted probabilities of both classes and ROC curve. If target is non-binary it shows confusion matrix. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/classification/ref_output_plot_k_neighbors.png>`_. Accepted formats: png (edam:format_3603). 

30 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

31 * **independent_vars** (*dict*) - ({}) Independent variables you want to train from your dataset. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked. 

32 * **target** (*dict*) - ({}) Dependent variable you want to predict from your dataset. You can specify either a column name or a column index. Formats: { "column": "column3" } or { "index": 21 }. In case of mulitple formats, the first one will be picked. 

33 * **weight** (*dict*) - ({}) Weight variable from your dataset. You can specify either a column name or a column index. Formats: { "column": "column3" } or { "index": 21 }. In case of mulitple formats, the first one will be picked. 

34 * **metric** (*string*) - ("minkowski") The distance metric to use for the tree. Values: euclidean (Computes the Euclidean distance between two 1-D arrays), manhattan (Compute the Manhattan distance), chebyshev (Compute the Chebyshev distance), minkowski (Compute the Minkowski distance between two 1-D arrays), wminkowski (Compute the weighted Minkowski distance between two 1-D arrays), seuclidean (Return the standardized Euclidean distance between two 1-D arrays), mahalanobi (Compute the Mahalanobis distance between two 1-D arrays). 

35 * **n_neighbors** (*int*) - (6) [1~100|1] Number of neighbors to use by default for kneighbors queries. 

36 * **normalize_cm** (*bool*) - (False) Whether or not to normalize the confusion matrix. 

37 * **random_state_train_test** (*int*) - (5) [1~1000|1] Controls the shuffling applied to the data before applying the split. 

38 * **test_size** (*float*) - (0.2) [0~1|0.05] Represents the proportion of the dataset to include in the test split. It should be between 0.0 and 1.0. 

39 * **scale** (*bool*) - (False) Whether or not to scale the input dataset. 

40 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

41 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

42 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

43 

44 Examples: 

45 This is a use example of how to use the building block from Python:: 

46 

47 from biobb_ml.classification.k_neighbors import k_neighbors 

48 prop = { 

49 'independent_vars': { 

50 'columns': [ 'column1', 'column2', 'column3' ] 

51 }, 

52 'target': { 

53 'column': 'target' 

54 }, 

55 'n_neighbors': 6, 

56 'test_size': 0.2 

57 } 

58 k_neighbors(input_dataset_path='/path/to/myDataset.csv', 

59 output_model_path='/path/to/newModel.pkl', 

60 output_test_table_path='/path/to/newTable.csv', 

61 output_plot_path='/path/to/newPlot.png', 

62 properties=prop) 

63 

64 Info: 

65 * wrapped_software: 

66 * name: scikit-learn KNeighborsClassifier 

67 * version: >=0.24.2 

68 * license: BSD 3-Clause 

69 * ontology: 

70 * name: EDAM 

71 * schema: http://edamontology.org/EDAM.owl 

72 

73 """ 

74 

75 def __init__(self, input_dataset_path, output_model_path, 

76 output_test_table_path=None, output_plot_path=None, properties=None, **kwargs) -> None: 

77 properties = properties or {} 

78 

79 # Call parent class constructor 

80 super().__init__(properties) 

81 self.locals_var_dict = locals().copy() 

82 

83 # Input/Output files 

84 self.io_dict = { 

85 "in": {"input_dataset_path": input_dataset_path}, 

86 "out": {"output_model_path": output_model_path, "output_test_table_path": output_test_table_path, "output_plot_path": output_plot_path} 

87 } 

88 

89 # Properties specific for BB 

90 self.independent_vars = properties.get('independent_vars', {}) 

91 self.target = properties.get('target', {}) 

92 self.weight = properties.get('weight', {}) 

93 self.metric = properties.get('metric', 'minkowski') 

94 self.n_neighbors = properties.get('n_neighbors', 6) 

95 self.normalize_cm = properties.get('normalize_cm', False) 

96 self.random_state_train_test = properties.get('random_state_train_test', 5) 

97 self.test_size = properties.get('test_size', 0.2) 

98 self.scale = properties.get('scale', False) 

99 self.properties = properties 

100 

101 # Check the properties 

102 self.check_properties(properties) 

103 self.check_arguments() 

104 

105 def check_data_params(self, out_log, err_log): 

106 """ Checks all the input/output paths and parameters """ 

107 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

108 self.io_dict["out"]["output_model_path"] = check_output_path(self.io_dict["out"]["output_model_path"], "output_model_path", False, out_log, self.__class__.__name__) 

109 if self.io_dict["out"]["output_test_table_path"]: 

110 self.io_dict["out"]["output_test_table_path"] = check_output_path(self.io_dict["out"]["output_test_table_path"], "output_test_table_path", True, out_log, self.__class__.__name__) 

111 if self.io_dict["out"]["output_plot_path"]: 

112 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__) 

113 

114 @launchlogger 

115 def launch(self) -> int: 

116 """Execute the :class:`KNeighborsTrain <classification.k_neighbors.KNeighborsTrain>` classification.k_neighbors.KNeighborsTrain object.""" 

117 

118 # check input/output paths and parameters 

119 self.check_data_params(self.out_log, self.err_log) 

120 

121 # Setup Biobb 

122 if self.check_restart(): 

123 return 0 

124 self.stage_files() 

125 

126 # load dataset 

127 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

128 if 'columns' in self.independent_vars: 

129 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

130 skiprows = 1 

131 else: 

132 labels = None 

133 skiprows = None 

134 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

135 

136 # declare inputs, targets and weights 

137 # the inputs are all the independent variables 

138 X = getIndependentVars(self.independent_vars, data, self.out_log, self.__class__.__name__) 

139 fu.log('Independent variables: [%s]' % (getIndependentVarsList(self.independent_vars)), self.out_log, self.global_log) 

140 # target 

141 y = getTarget(self.target, data, self.out_log, self.__class__.__name__) 

142 fu.log('Target: %s' % (getTargetValue(self.target)), self.out_log, self.global_log) 

143 # weights 

144 if self.weight: 

145 w = getWeight(self.weight, data, self.out_log, self.__class__.__name__) 

146 fu.log('Weight column provided', self.out_log, self.global_log) 

147 

148 # train / test split 

149 fu.log('Creating train and test sets', self.out_log, self.global_log) 

150 arrays_sets = (X, y) 

151 # if user provide weights 

152 if self.weight: 

153 arrays_sets = arrays_sets + (w,) 

154 X_train, X_test, y_train, y_test, w_train, w_test = train_test_split(*arrays_sets, test_size=self.test_size, random_state=self.random_state_train_test) 

155 else: 

156 X_train, X_test, y_train, y_test = train_test_split(*arrays_sets, test_size=self.test_size, random_state=self.random_state_train_test) 

157 

158 # scale dataset 

159 if self.scale: 

160 fu.log('Scaling dataset', self.out_log, self.global_log) 

161 scaler = StandardScaler() 

162 X_train = scaler.fit_transform(X_train) 

163 

164 # classification 

165 fu.log('Training dataset applying k neighbors classification', self.out_log, self.global_log) 

166 model = KNeighborsClassifier(n_neighbors=self.n_neighbors) 

167 

168 arrays_fit = (X_train, y_train) 

169 # if user provide weights 

170 if self.weight: 

171 arrays_fit = arrays_fit + (w_train,) 

172 

173 model.fit(*arrays_fit) 

174 

175 y_hat_train = model.predict(X_train) 

176 # classification report 

177 cr_train = classification_report(y_train, y_hat_train) 

178 # log loss 

179 yhat_prob_train = model.predict_proba(X_train) 

180 l_loss_train = log_loss(y_train, yhat_prob_train) 

181 fu.log('Calculating scores and report for training dataset\n\nCLASSIFICATION REPORT\n\n%s\nLog loss: %.3f\n' % (cr_train, l_loss_train), self.out_log, self.global_log) 

182 

183 # compute confusion matrix 

184 cnf_matrix_train = confusion_matrix(y_train, y_hat_train) 

185 np.set_printoptions(precision=2) 

186 if self.normalize_cm: 

187 cnf_matrix_train = cnf_matrix_train.astype('float') / cnf_matrix_train.sum(axis=1)[:, np.newaxis] 

188 cm_type = 'NORMALIZED CONFUSION MATRIX' 

189 else: 

190 cm_type = 'CONFUSION MATRIX, WITHOUT NORMALIZATION' 

191 

192 fu.log('Calculating confusion matrix for training dataset\n\n%s\n\n%s\n' % (cm_type, cnf_matrix_train), self.out_log, self.global_log) 

193 

194 # testing 

195 # predict data from x_test 

196 if self.scale: 

197 X_test = scaler.transform(X_test) 

198 y_hat_test = model.predict(X_test) 

199 test_table = pd.DataFrame() 

200 y_hat_prob = model.predict_proba(X_test) 

201 y_hat_prob = np.around(y_hat_prob, decimals=2) 

202 y_hat_prob = tuple(map(tuple, y_hat_prob)) 

203 test_table['P' + np.array2string(np.unique(y_test))] = y_hat_prob 

204 y_test = y_test.reset_index(drop=True) 

205 test_table['target'] = y_test 

206 fu.log('Testing\n\nTEST DATA\n\n%s\n' % test_table, self.out_log, self.global_log) 

207 

208 # classification report 

209 cr = classification_report(y_test, y_hat_test) 

210 # log loss 

211 yhat_prob = model.predict_proba(X_test) 

212 l_loss = log_loss(y_test, yhat_prob) 

213 fu.log('Calculating scores and report for testing dataset\n\nCLASSIFICATION REPORT\n\n%s\nLog loss: %.3f\n' % (cr, l_loss), self.out_log, self.global_log) 

214 

215 # compute confusion matrix 

216 cnf_matrix = confusion_matrix(y_test, y_hat_test) 

217 np.set_printoptions(precision=2) 

218 if self.normalize_cm: 

219 cnf_matrix = cnf_matrix.astype('float') / cnf_matrix.sum(axis=1)[:, np.newaxis] 

220 cm_type = 'NORMALIZED CONFUSION MATRIX' 

221 else: 

222 cm_type = 'CONFUSION MATRIX, WITHOUT NORMALIZATION' 

223 

224 fu.log('Calculating confusion matrix for testing dataset\n\n%s\n\n%s\n' % (cm_type, cnf_matrix), self.out_log, self.global_log) 

225 

226 if (self.io_dict["out"]["output_test_table_path"]): 

227 fu.log('Saving testing data to %s' % self.io_dict["out"]["output_test_table_path"], self.out_log, self.global_log) 

228 test_table.to_csv(self.io_dict["out"]["output_test_table_path"], index=False, header=True) 

229 

230 # plot 

231 if self.io_dict["out"]["output_plot_path"]: 

232 vs = y.unique().tolist() 

233 vs.sort() 

234 if len(vs) > 2: 

235 plot = plotMultipleCM(cnf_matrix_train, cnf_matrix, self.normalize_cm, vs) 

236 fu.log('Saving confusion matrix plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

237 else: 

238 plot = plotBinaryClassifier(model, yhat_prob_train, yhat_prob, cnf_matrix_train, cnf_matrix, y_train, y_test, normalize=self.normalize_cm) 

239 fu.log('Saving binary classifier evaluator plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

240 plot.savefig(self.io_dict["out"]["output_plot_path"], dpi=150) 

241 

242 # save model, scaler and parameters 

243 tv = y.unique().tolist() 

244 tv.sort() 

245 variables = { 

246 'target': self.target, 

247 'independent_vars': self.independent_vars, 

248 'scale': self.scale, 

249 'target_values': tv 

250 } 

251 fu.log('Saving model to %s' % self.io_dict["out"]["output_model_path"], self.out_log, self.global_log) 

252 with open(self.io_dict["out"]["output_model_path"], "wb") as f: 

253 joblib.dump(model, f) 

254 if self.scale: 

255 joblib.dump(scaler, f) 

256 joblib.dump(variables, f) 

257 

258 # Copy files to host 

259 self.copy_to_host() 

260 

261 self.tmp_files.extend([ 

262 self.stage_io_dict.get("unique_dir") 

263 ]) 

264 self.remove_tmp_files() 

265 

266 self.check_arguments(output_files_created=True, raise_exception=False) 

267 

268 return 0 

269 

270 

271def k_neighbors(input_dataset_path: str, output_model_path: str, output_test_table_path: str = None, output_plot_path: str = None, properties: dict = None, **kwargs) -> int: 

272 """Execute the :class:`KNeighborsTrain <classification.k_neighbors.KNeighborsTrain>` class and 

273 execute the :meth:`launch() <classification.k_neighbors.KNeighborsTrain.launch>` method.""" 

274 

275 return KNeighborsTrain(input_dataset_path=input_dataset_path, 

276 output_model_path=output_model_path, 

277 output_test_table_path=output_test_table_path, 

278 output_plot_path=output_plot_path, 

279 properties=properties, **kwargs).launch() 

280 

281 

282def main(): 

283 """Command line execution of this building block. Please check the command line documentation.""" 

284 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn KNeighborsClassifier method. ", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

285 parser.add_argument('--config', required=False, help='Configuration file') 

286 

287 # Specific args of each building block 

288 required_args = parser.add_argument_group('required arguments') 

289 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

290 required_args.add_argument('--output_model_path', required=True, help='Path to the output model file. Accepted formats: pkl.') 

291 parser.add_argument('--output_test_table_path', required=False, help='Path to the test table file. Accepted formats: csv.') 

292 parser.add_argument('--output_plot_path', required=False, help='Path to the statistics plot. If target is binary it shows confusion matrix, distributions of the predicted probabilities of both classes and ROC curve. If target is non-binary it shows confusion matrix. Accepted formats: png.') 

293 

294 args = parser.parse_args() 

295 args.config = args.config or "{}" 

296 properties = settings.ConfReader(config=args.config).get_prop_dic() 

297 

298 # Specific call of each building block 

299 k_neighbors(input_dataset_path=args.input_dataset_path, 

300 output_model_path=args.output_model_path, 

301 output_test_table_path=args.output_test_table_path, 

302 output_plot_path=args.output_plot_path, 

303 properties=properties) 

304 

305 

306if __name__ == '__main__': 

307 main()