Coverage for biobb_ml/classification/k_neighbors.py: 83%

151 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:39 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the KNeighborsTrain class and the command line interface.""" 

4import argparse 

5import pandas as pd 

6import numpy as np 

7import joblib 

8from biobb_common.generic.biobb_object import BiobbObject 

9from sklearn.preprocessing import StandardScaler 

10from sklearn.model_selection import train_test_split 

11from sklearn.metrics import confusion_matrix, classification_report, log_loss 

12from sklearn.neighbors import KNeighborsClassifier 

13from biobb_common.configuration import settings 

14from biobb_common.tools import file_utils as fu 

15from biobb_common.tools.file_utils import launchlogger 

16from biobb_ml.classification.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, getTarget, getTargetValue, getWeight, plotMultipleCM, plotBinaryClassifier 

17 

18 

19class KNeighborsTrain(BiobbObject): 

20 """ 

21 | biobb_ml KNeighborsTrain 

22 | Wrapper of the scikit-learn KNeighborsClassifier method. 

23 | Trains and tests a given dataset and saves the model and scaler. Visit the `KNeighborsClassifier documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.KNeighborsClassifier.html>`_ in the sklearn official website for further information. 

24 

25 Args: 

26 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/classification/dataset_k_neighbors.csv>`_. Accepted formats: csv (edam:format_3752). 

27 output_model_path (str): Path to the output model file. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/classification/ref_output_model_k_neighbors.pkl>`_. Accepted formats: pkl (edam:format_3653). 

28 output_test_table_path (str) (Optional): Path to the test table file. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/classification/ref_output_test_k_neighbors.csv>`_. Accepted formats: csv (edam:format_3752). 

29 output_plot_path (str) (Optional): Path to the statistics plot. If target is binary it shows confusion matrix, distributions of the predicted probabilities of both classes and ROC curve. If target is non-binary it shows confusion matrix. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/classification/ref_output_plot_k_neighbors.png>`_. Accepted formats: png (edam:format_3603). 

30 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

31 * **independent_vars** (*dict*) - ({}) Independent variables you want to train from your dataset. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked. 

32 * **target** (*dict*) - ({}) Dependent variable you want to predict from your dataset. You can specify either a column name or a column index. Formats: { "column": "column3" } or { "index": 21 }. In case of mulitple formats, the first one will be picked. 

33 * **weight** (*dict*) - ({}) Weight variable from your dataset. You can specify either a column name or a column index. Formats: { "column": "column3" } or { "index": 21 }. In case of mulitple formats, the first one will be picked. 

34 * **metric** (*string*) - ("minkowski") The distance metric to use for the tree. Values: euclidean (Computes the Euclidean distance between two 1-D arrays), manhattan (Compute the Manhattan distance), chebyshev (Compute the Chebyshev distance), minkowski (Compute the Minkowski distance between two 1-D arrays), wminkowski (Compute the weighted Minkowski distance between two 1-D arrays), seuclidean (Return the standardized Euclidean distance between two 1-D arrays), mahalanobi (Compute the Mahalanobis distance between two 1-D arrays). 

35 * **n_neighbors** (*int*) - (6) [1~100|1] Number of neighbors to use by default for kneighbors queries. 

36 * **normalize_cm** (*bool*) - (False) Whether or not to normalize the confusion matrix. 

37 * **random_state_train_test** (*int*) - (5) [1~1000|1] Controls the shuffling applied to the data before applying the split. 

38 * **test_size** (*float*) - (0.2) [0~1|0.05] Represents the proportion of the dataset to include in the test split. It should be between 0.0 and 1.0. 

39 * **scale** (*bool*) - (False) Whether or not to scale the input dataset. 

40 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

41 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

42 

43 Examples: 

44 This is a use example of how to use the building block from Python:: 

45 

46 from biobb_ml.classification.k_neighbors import k_neighbors 

47 prop = { 

48 'independent_vars': { 

49 'columns': [ 'column1', 'column2', 'column3' ] 

50 }, 

51 'target': { 

52 'column': 'target' 

53 }, 

54 'n_neighbors': 6, 

55 'test_size': 0.2 

56 } 

57 k_neighbors(input_dataset_path='/path/to/myDataset.csv', 

58 output_model_path='/path/to/newModel.pkl', 

59 output_test_table_path='/path/to/newTable.csv', 

60 output_plot_path='/path/to/newPlot.png', 

61 properties=prop) 

62 

63 Info: 

64 * wrapped_software: 

65 * name: scikit-learn KNeighborsClassifier 

66 * version: >=0.24.2 

67 * license: BSD 3-Clause 

68 * ontology: 

69 * name: EDAM 

70 * schema: http://edamontology.org/EDAM.owl 

71 

72 """ 

73 

74 def __init__(self, input_dataset_path, output_model_path, 

75 output_test_table_path=None, output_plot_path=None, properties=None, **kwargs) -> None: 

76 properties = properties or {} 

77 

78 # Call parent class constructor 

79 super().__init__(properties) 

80 self.locals_var_dict = locals().copy() 

81 

82 # Input/Output files 

83 self.io_dict = { 

84 "in": {"input_dataset_path": input_dataset_path}, 

85 "out": {"output_model_path": output_model_path, "output_test_table_path": output_test_table_path, "output_plot_path": output_plot_path} 

86 } 

87 

88 # Properties specific for BB 

89 self.independent_vars = properties.get('independent_vars', {}) 

90 self.target = properties.get('target', {}) 

91 self.weight = properties.get('weight', {}) 

92 self.metric = properties.get('metric', 'minkowski') 

93 self.n_neighbors = properties.get('n_neighbors', 6) 

94 self.normalize_cm = properties.get('normalize_cm', False) 

95 self.random_state_train_test = properties.get('random_state_train_test', 5) 

96 self.test_size = properties.get('test_size', 0.2) 

97 self.scale = properties.get('scale', False) 

98 self.properties = properties 

99 

100 # Check the properties 

101 self.check_properties(properties) 

102 self.check_arguments() 

103 

104 def check_data_params(self, out_log, err_log): 

105 """ Checks all the input/output paths and parameters """ 

106 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

107 self.io_dict["out"]["output_model_path"] = check_output_path(self.io_dict["out"]["output_model_path"], "output_model_path", False, out_log, self.__class__.__name__) 

108 if self.io_dict["out"]["output_test_table_path"]: 

109 self.io_dict["out"]["output_test_table_path"] = check_output_path(self.io_dict["out"]["output_test_table_path"], "output_test_table_path", True, out_log, self.__class__.__name__) 

110 if self.io_dict["out"]["output_plot_path"]: 

111 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__) 

112 

113 @launchlogger 

114 def launch(self) -> int: 

115 """Execute the :class:`KNeighborsTrain <classification.k_neighbors.KNeighborsTrain>` classification.k_neighbors.KNeighborsTrain object.""" 

116 

117 # check input/output paths and parameters 

118 self.check_data_params(self.out_log, self.err_log) 

119 

120 # Setup Biobb 

121 if self.check_restart(): 

122 return 0 

123 self.stage_files() 

124 

125 # load dataset 

126 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

127 if 'columns' in self.independent_vars: 

128 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

129 skiprows = 1 

130 else: 

131 labels = None 

132 skiprows = None 

133 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

134 

135 # declare inputs, targets and weights 

136 # the inputs are all the independent variables 

137 X = getIndependentVars(self.independent_vars, data, self.out_log, self.__class__.__name__) 

138 fu.log('Independent variables: [%s]' % (getIndependentVarsList(self.independent_vars)), self.out_log, self.global_log) 

139 # target 

140 y = getTarget(self.target, data, self.out_log, self.__class__.__name__) 

141 fu.log('Target: %s' % (getTargetValue(self.target)), self.out_log, self.global_log) 

142 # weights 

143 if self.weight: 

144 w = getWeight(self.weight, data, self.out_log, self.__class__.__name__) 

145 fu.log('Weight column provided', self.out_log, self.global_log) 

146 

147 # train / test split 

148 fu.log('Creating train and test sets', self.out_log, self.global_log) 

149 arrays_sets = (X, y) 

150 # if user provide weights 

151 if self.weight: 

152 arrays_sets = arrays_sets + (w,) 

153 X_train, X_test, y_train, y_test, w_train, w_test = train_test_split(*arrays_sets, test_size=self.test_size, random_state=self.random_state_train_test) 

154 else: 

155 X_train, X_test, y_train, y_test = train_test_split(*arrays_sets, test_size=self.test_size, random_state=self.random_state_train_test) 

156 

157 # scale dataset 

158 if self.scale: 

159 fu.log('Scaling dataset', self.out_log, self.global_log) 

160 scaler = StandardScaler() 

161 X_train = scaler.fit_transform(X_train) 

162 

163 # classification 

164 fu.log('Training dataset applying k neighbors classification', self.out_log, self.global_log) 

165 model = KNeighborsClassifier(n_neighbors=self.n_neighbors) 

166 

167 arrays_fit = (X_train, y_train) 

168 # if user provide weights 

169 if self.weight: 

170 arrays_fit = arrays_fit + (w_train,) 

171 

172 model.fit(*arrays_fit) 

173 

174 y_hat_train = model.predict(X_train) 

175 # classification report 

176 cr_train = classification_report(y_train, y_hat_train) 

177 # log loss 

178 yhat_prob_train = model.predict_proba(X_train) 

179 l_loss_train = log_loss(y_train, yhat_prob_train) 

180 fu.log('Calculating scores and report for training dataset\n\nCLASSIFICATION REPORT\n\n%s\nLog loss: %.3f\n' % (cr_train, l_loss_train), self.out_log, self.global_log) 

181 

182 # compute confusion matrix 

183 cnf_matrix_train = confusion_matrix(y_train, y_hat_train) 

184 np.set_printoptions(precision=2) 

185 if self.normalize_cm: 

186 cnf_matrix_train = cnf_matrix_train.astype('float') / cnf_matrix_train.sum(axis=1)[:, np.newaxis] 

187 cm_type = 'NORMALIZED CONFUSION MATRIX' 

188 else: 

189 cm_type = 'CONFUSION MATRIX, WITHOUT NORMALIZATION' 

190 

191 fu.log('Calculating confusion matrix for training dataset\n\n%s\n\n%s\n' % (cm_type, cnf_matrix_train), self.out_log, self.global_log) 

192 

193 # testing 

194 # predict data from x_test 

195 if self.scale: 

196 X_test = scaler.transform(X_test) 

197 y_hat_test = model.predict(X_test) 

198 test_table = pd.DataFrame() 

199 y_hat_prob = model.predict_proba(X_test) 

200 y_hat_prob = np.around(y_hat_prob, decimals=2) 

201 y_hat_prob = tuple(map(tuple, y_hat_prob)) 

202 test_table['P' + np.array2string(np.unique(y_test))] = y_hat_prob 

203 y_test = y_test.reset_index(drop=True) 

204 test_table['target'] = y_test 

205 fu.log('Testing\n\nTEST DATA\n\n%s\n' % test_table, self.out_log, self.global_log) 

206 

207 # classification report 

208 cr = classification_report(y_test, y_hat_test) 

209 # log loss 

210 yhat_prob = model.predict_proba(X_test) 

211 l_loss = log_loss(y_test, yhat_prob) 

212 fu.log('Calculating scores and report for testing dataset\n\nCLASSIFICATION REPORT\n\n%s\nLog loss: %.3f\n' % (cr, l_loss), self.out_log, self.global_log) 

213 

214 # compute confusion matrix 

215 cnf_matrix = confusion_matrix(y_test, y_hat_test) 

216 np.set_printoptions(precision=2) 

217 if self.normalize_cm: 

218 cnf_matrix = cnf_matrix.astype('float') / cnf_matrix.sum(axis=1)[:, np.newaxis] 

219 cm_type = 'NORMALIZED CONFUSION MATRIX' 

220 else: 

221 cm_type = 'CONFUSION MATRIX, WITHOUT NORMALIZATION' 

222 

223 fu.log('Calculating confusion matrix for testing dataset\n\n%s\n\n%s\n' % (cm_type, cnf_matrix), self.out_log, self.global_log) 

224 

225 if (self.io_dict["out"]["output_test_table_path"]): 

226 fu.log('Saving testing data to %s' % self.io_dict["out"]["output_test_table_path"], self.out_log, self.global_log) 

227 test_table.to_csv(self.io_dict["out"]["output_test_table_path"], index=False, header=True) 

228 

229 # plot 

230 if self.io_dict["out"]["output_plot_path"]: 

231 vs = y.unique().tolist() 

232 vs.sort() 

233 if len(vs) > 2: 

234 plot = plotMultipleCM(cnf_matrix_train, cnf_matrix, self.normalize_cm, vs) 

235 fu.log('Saving confusion matrix plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

236 else: 

237 plot = plotBinaryClassifier(model, yhat_prob_train, yhat_prob, cnf_matrix_train, cnf_matrix, y_train, y_test, normalize=self.normalize_cm) 

238 fu.log('Saving binary classifier evaluator plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

239 plot.savefig(self.io_dict["out"]["output_plot_path"], dpi=150) 

240 

241 # save model, scaler and parameters 

242 tv = y.unique().tolist() 

243 tv.sort() 

244 variables = { 

245 'target': self.target, 

246 'independent_vars': self.independent_vars, 

247 'scale': self.scale, 

248 'target_values': tv 

249 } 

250 fu.log('Saving model to %s' % self.io_dict["out"]["output_model_path"], self.out_log, self.global_log) 

251 with open(self.io_dict["out"]["output_model_path"], "wb") as f: 

252 joblib.dump(model, f) 

253 if self.scale: 

254 joblib.dump(scaler, f) 

255 joblib.dump(variables, f) 

256 

257 # Copy files to host 

258 self.copy_to_host() 

259 

260 self.tmp_files.extend([ 

261 self.stage_io_dict.get("unique_dir") 

262 ]) 

263 self.remove_tmp_files() 

264 

265 self.check_arguments(output_files_created=True, raise_exception=False) 

266 

267 return 0 

268 

269 

270def k_neighbors(input_dataset_path: str, output_model_path: str, output_test_table_path: str = None, output_plot_path: str = None, properties: dict = None, **kwargs) -> int: 

271 """Execute the :class:`KNeighborsTrain <classification.k_neighbors.KNeighborsTrain>` class and 

272 execute the :meth:`launch() <classification.k_neighbors.KNeighborsTrain.launch>` method.""" 

273 

274 return KNeighborsTrain(input_dataset_path=input_dataset_path, 

275 output_model_path=output_model_path, 

276 output_test_table_path=output_test_table_path, 

277 output_plot_path=output_plot_path, 

278 properties=properties, **kwargs).launch() 

279 

280 

281def main(): 

282 """Command line execution of this building block. Please check the command line documentation.""" 

283 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn KNeighborsClassifier method. ", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

284 parser.add_argument('--config', required=False, help='Configuration file') 

285 

286 # Specific args of each building block 

287 required_args = parser.add_argument_group('required arguments') 

288 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

289 required_args.add_argument('--output_model_path', required=True, help='Path to the output model file. Accepted formats: pkl.') 

290 parser.add_argument('--output_test_table_path', required=False, help='Path to the test table file. Accepted formats: csv.') 

291 parser.add_argument('--output_plot_path', required=False, help='Path to the statistics plot. If target is binary it shows confusion matrix, distributions of the predicted probabilities of both classes and ROC curve. If target is non-binary it shows confusion matrix. Accepted formats: png.') 

292 

293 args = parser.parse_args() 

294 args.config = args.config or "{}" 

295 properties = settings.ConfReader(config=args.config).get_prop_dic() 

296 

297 # Specific call of each building block 

298 k_neighbors(input_dataset_path=args.input_dataset_path, 

299 output_model_path=args.output_model_path, 

300 output_test_table_path=args.output_test_table_path, 

301 output_plot_path=args.output_plot_path, 

302 properties=properties) 

303 

304 

305if __name__ == '__main__': 

306 main()