Coverage for biobb_ml/resampling/oversampling.py: 78%

157 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:39 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the Oversampling class and the command line interface.""" 

4import argparse 

5import numpy as np 

6import pandas as pd 

7from collections import Counter 

8from biobb_common.generic.biobb_object import BiobbObject 

9from sklearn import preprocessing 

10from sklearn.model_selection import cross_val_score 

11from sklearn.model_selection import RepeatedStratifiedKFold 

12from sklearn.ensemble import RandomForestClassifier 

13from biobb_ml.resampling.reg_resampler import resampler 

14from biobb_common.configuration import settings 

15from biobb_common.tools import file_utils as fu 

16from biobb_common.tools.file_utils import launchlogger 

17from biobb_ml.resampling.common import check_input_path, check_output_path, getResamplingMethod, checkResamplingType, getSamplingStrategy, getTargetValue, getHeader, getTarget, oversampling_methods 

18 

19 

20class Oversampling(BiobbObject): 

21 """ 

22 | biobb_ml Oversampling 

23 | Wrapper of most of the imblearn.over_sampling methods. 

24 | Involves supplementing the training data with multiple copies of some of the minority classes of a given dataset. If regression is specified as type, the data will be resampled to classes in order to apply the oversampling model. Visit the imbalanced-learn official website for the different methods accepted in this wrapper: `RandomOverSampler <https://imbalanced-learn.readthedocs.io/en/stable/generated/imblearn.over_sampling.RandomOverSampler.html>`_, `SMOTE <https://imbalanced-learn.readthedocs.io/en/stable/generated/imblearn.over_sampling.SMOTE.html>`_, `BorderlineSMOTE <https://imbalanced-learn.readthedocs.io/en/stable/generated/imblearn.over_sampling.BorderlineSMOTE.html>`_, `SVMSMOTE <https://imbalanced-learn.readthedocs.io/en/stable/generated/imblearn.over_sampling.SVMSMOTE.html>`_, `ADASYN <https://imbalanced-learn.readthedocs.io/en/stable/generated/imblearn.over_sampling.ADASYN.html>`_ 

25 

26 Args: 

27 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/resampling/dataset_resampling.csv>`_. Accepted formats: csv (edam:format_3752). 

28 output_dataset_path (str): Path to the output dataset. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/resampling/ref_output_oversampling.csv>`_. Accepted formats: csv (edam:format_3752). 

29 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

30 * **method** (*str*) - (None) Oversampling method. It's a mandatory property. Values: random (`RandomOverSampler <https://imbalanced-learn.readthedocs.io/en/stable/generated/imblearn.over_sampling.RandomOverSampler.html>`_: Object to over-sample the minority classes by picking samples at random with replacement), smote (`SMOTE <https://imbalanced-learn.readthedocs.io/en/stable/generated/imblearn.over_sampling.SMOTE.html>`_: This object is an implementation of SMOTE - Synthetic Minority Over-sampling Technique), borderline (`BorderlineSMOTE <https://imbalanced-learn.readthedocs.io/en/stable/generated/imblearn.over_sampling.BorderlineSMOTE.html>`_: This algorithm is a variant of the original SMOTE algorithm. Borderline samples will be detected and used to generate new synthetic samples), svmsmote (`SVMSMOTE <https://imbalanced-learn.readthedocs.io/en/stable/generated/imblearn.over_sampling.SVMSMOTE.html>`_: Variant of SMOTE algorithm which use an SVM algorithm to detect sample to use for generating new synthetic samples), adasyn (`ADASYN <https://imbalanced-learn.readthedocs.io/en/stable/generated/imblearn.over_sampling.ADASYN.html>`_: Perform over-sampling using Adaptive Synthetic -ADASYN- sampling approach for imbalanced datasets). 

31 * **type** (*str*) - (None) Type of oversampling. It's a mandatory property. Values: regression (the oversampling will be applied on a continuous dataset), classification (the oversampling will be applied on a classified dataset). 

32 * **target** (*dict*) - ({}) Dependent variable you want to predict from your dataset. You can specify either a column name or a column index. Formats: { "column": "column3" } or { "index": 21 }. In case of mulitple formats, the first one will be picked. 

33 * **evaluate** (*bool*) - (False) Whether or not to evaluate the dataset before and after applying the resampling. 

34 * **evaluate_splits** (*int*) - (3) [2~100|1] Number of folds to be applied by the Repeated Stratified K-Fold evaluation method. Must be at least 2. 

35 * **evaluate_repeats** (*int*) - (3) [2~100|1] Number of times Repeated Stratified K-Fold cross validator needs to be repeated. 

36 * **n_bins** (*int*) - (5) [1~100|1] Only for regression oversampling. The number of classes that the user wants to generate with the target data. 

37 * **balanced_binning** (*bool*) - (False) Only for regression oversampling. Decides whether samples are to be distributed roughly equally across all classes. 

38 * **sampling_strategy** (*dict*) - ({ "target": "auto" }) Sampling information to sample the data set. Formats: { "target": "auto" }, { "ratio": 0.3 }, { "dict": { 0: 300, 1: 200, 2: 100 } } or { "list": [0, 2, 3] }. When "target", specify the class targeted by the resampling; the number of samples in the different classes will be equalized; possible choices are: minority (resample only the minority class), not minority (resample all classes but the minority class), not majority (resample all classes but the majority class), all (resample all classes), auto (equivalent to 'not majority'). When "ratio", it corresponds to the desired ratio of the number of samples in the minority class over the number of samples in the majority class after resampling (ONLY IN CASE OF BINARY CLASSIFICATION). When "dict", the keys correspond to the targeted classes, the values correspond to the desired number of samples for each targeted class. When "list", the list contains the classes targeted by the resampling. 

39 * **k_neighbors** (*int*) - (5) [1~100|1] Only for SMOTE, BorderlineSMOTE, SVMSMOTE, ADASYN. The number of nearest neighbours used to construct synthetic samples. 

40 * **random_state_method** (*int*) - (5) [1~1000|1] Controls the randomization of the algorithm. 

41 * **random_state_evaluate** (*int*) - (5) [1~1000|1] Controls the shuffling applied to the Repeated Stratified K-Fold evaluation method. 

42 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

43 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

44 

45 Examples: 

46 This is a use example of how to use the building block from Python:: 

47 

48 from biobb_ml.resampling.oversampling import oversampling 

49 prop = { 

50 'method': 'random, 

51 'type': 'regression, 

52 'target': { 

53 'column': 'target' 

54 }, 

55 'evaluate': true, 

56 'n_bins': 10, 

57 'sampling_strategy': { 

58 'target': 'minority' 

59 } 

60 } 

61 oversampling(input_dataset_path='/path/to/myDataset.csv', 

62 output_dataset_path='/path/to/newDataset.csv', 

63 properties=prop) 

64 

65 Info: 

66 * wrapped_software: 

67 * name: imbalanced-learn over_sampling 

68 * version: >0.7.0 

69 * license: MIT 

70 * ontology: 

71 * name: EDAM 

72 * schema: http://edamontology.org/EDAM.owl 

73 

74 """ 

75 

76 def __init__(self, input_dataset_path, output_dataset_path, 

77 properties=None, **kwargs) -> None: 

78 properties = properties or {} 

79 

80 # Call parent class constructor 

81 super().__init__(properties) 

82 self.locals_var_dict = locals().copy() 

83 

84 # Input/Output files 

85 self.io_dict = { 

86 "in": {"input_dataset_path": input_dataset_path}, 

87 "out": {"output_dataset_path": output_dataset_path} 

88 } 

89 

90 # Properties specific for BB 

91 self.method = properties.get('method', None) 

92 self.type = properties.get('type', None) 

93 self.target = properties.get('target', {}) 

94 self.evaluate = properties.get('evaluate', False) 

95 self.evaluate_splits = properties.get('evaluate_splits', 3) 

96 self.evaluate_repeats = properties.get('evaluate_repeats', 3) 

97 self.n_bins = properties.get('n_bins', 5) 

98 self.balanced_binning = properties.get('balanced_binning', False) 

99 self.sampling_strategy = properties.get('sampling_strategy', {'target': 'auto'}) 

100 self.k_neighbors = properties.get('k_neighbors', 5) 

101 self.random_state_method = properties.get('random_state_method', 5) 

102 self.random_state_evaluate = properties.get('random_state_evaluate', 5) 

103 self.properties = properties 

104 

105 # Check the properties 

106 self.check_properties(properties) 

107 self.check_arguments() 

108 

109 def check_data_params(self, out_log, err_log): 

110 """ Checks all the input/output paths and parameters """ 

111 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

112 self.io_dict["out"]["output_dataset_path"] = check_output_path(self.io_dict["out"]["output_dataset_path"], "output_dataset_path", False, out_log, self.__class__.__name__) 

113 

114 @launchlogger 

115 def launch(self) -> int: 

116 """Execute the :class:`Oversampling <resampling.oversampling.Oversampling>` resampling.oversampling.Oversampling object.""" 

117 

118 # check input/output paths and parameters 

119 self.check_data_params(self.out_log, self.err_log) 

120 

121 # Setup Biobb 

122 if self.check_restart(): 

123 return 0 

124 self.stage_files() 

125 

126 # check mandatory properties 

127 method = getResamplingMethod(self.method, 'oversampling', self.out_log, self.__class__.__name__) 

128 checkResamplingType(self.type, self.out_log, self.__class__.__name__) 

129 sampling_strategy = getSamplingStrategy(self.sampling_strategy, self.out_log, self.__class__.__name__) 

130 

131 # load dataset 

132 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

133 if 'column' in self.target: 

134 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

135 skiprows = 1 

136 header = 0 

137 else: 

138 labels = None 

139 skiprows = None 

140 header = None 

141 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

142 

143 train_df = data 

144 ranges = None 

145 

146 le = preprocessing.LabelEncoder() 

147 

148 cols_encoded = [] 

149 for column in train_df: 

150 # if type object, LabelEncoder.fit_transform 

151 if train_df[column].dtypes == 'object': 

152 cols_encoded.append(column) 

153 train_df[column] = le.fit_transform(train_df[column]) 

154 

155 # defining X 

156 X = train_df.loc[:, train_df.columns != getTargetValue(self.target, self.out_log, self.__class__.__name__)] 

157 # calling oversample method 

158 if self.method == 'random': 

159 method = method(sampling_strategy=sampling_strategy, random_state=self.random_state_method) 

160 elif self.method == 'smote': 

161 method = method(sampling_strategy=sampling_strategy, k_neighbors=self.k_neighbors, random_state=self.random_state_method) 

162 elif self.method == 'borderline': 

163 method = method(sampling_strategy=sampling_strategy, k_neighbors=self.k_neighbors, random_state=self.random_state_method) 

164 elif self.method == 'svmsmote': 

165 method = method(sampling_strategy=sampling_strategy, k_neighbors=self.k_neighbors, random_state=self.random_state_method) 

166 elif self.method == 'adasyn': 

167 method = method(sampling_strategy=sampling_strategy, n_neighbors=self.k_neighbors, random_state=self.random_state_method) 

168 

169 fu.log('Target: %s' % (getTargetValue(self.target, self.out_log, self.__class__.__name__)), self.out_log, self.global_log) 

170 

171 # oversampling 

172 if self.type == 'regression': 

173 fu.log('Oversampling regression dataset, continuous data will be classified', self.out_log, self.global_log) 

174 # call resampler class for Regression ReSampling 

175 rs = resampler() 

176 # Create n_bins classes for the dataset 

177 ranges, y, target_pos = rs.fit(train_df, target=getTargetValue(self.target, self.out_log, self.__class__.__name__), bins=self.n_bins, balanced_binning=self.balanced_binning, verbose=0) 

178 # Get the over-sampled data 

179 final_X, final_y = rs.resample(method, train_df, y) 

180 elif self.type == 'classification': 

181 # get X and y 

182 y = getTarget(self.target, train_df, self.out_log, self.__class__.__name__) 

183 # fit and resample 

184 final_X, final_y = method.fit_resample(X, y) 

185 target_pos = None 

186 

187 # evaluate oversampling 

188 if self.evaluate: 

189 fu.log('Evaluating data before oversampling with RandomForestClassifier', self.out_log, self.global_log) 

190 cv = RepeatedStratifiedKFold(n_splits=self.evaluate_splits, n_repeats=self.evaluate_repeats, random_state=self.random_state_evaluate) 

191 # evaluate model 

192 scores = cross_val_score(RandomForestClassifier(), X, y, scoring='accuracy', cv=cv, n_jobs=-1) 

193 if not np.isnan(np.mean(scores)): 

194 fu.log('Mean Accuracy before oversampling: %.3f' % (np.mean(scores)), self.out_log, self.global_log) 

195 else: 

196 fu.log('Unable to calculate cross validation score, NaN was returned.', self.out_log, self.global_log) 

197 

198 # log distribution before oversampling 

199 dist = '' 

200 for k, v in Counter(y).items(): 

201 per = v / len(y) * 100 

202 rng = '' 

203 if ranges: 

204 rng = str(ranges[k]) 

205 dist = dist + 'Class=%d, n=%d (%.3f%%) %s\n' % (k, v, per, rng) 

206 fu.log('Classes distribution before oversampling:\n\n%s' % dist, self.out_log, self.global_log) 

207 

208 # join final_X and final_y in the output dataframe 

209 if header is None: 

210 # numpy 

211 out_df = np.column_stack((final_X, final_y)) 

212 else: 

213 # pandas 

214 out_df = final_X.join(final_y) 

215 

216 # if no header, convert np to pd 

217 if header is None: 

218 out_df = pd.DataFrame(data=out_df) 

219 

220 # if cols encoded, decode them 

221 if cols_encoded: 

222 for column in cols_encoded: 

223 if header is None: 

224 out_df = out_df.astype({column: int}) 

225 out_df[column] = le.inverse_transform(out_df[column].values.ravel()) 

226 

227 # if no header, target is in a different column 

228 if target_pos: 

229 t = target_pos 

230 else: 

231 t = getTargetValue(self.target, self.out_log, self.__class__.__name__) 

232 # log distribution after oversampling 

233 if self.type == 'regression': 

234 ranges, y_out, _ = rs.fit(out_df, target=t, bins=self.n_bins, balanced_binning=self.balanced_binning, verbose=0) 

235 elif self.type == 'classification': 

236 y_out = getTarget(self.target, out_df, self.out_log, self.__class__.__name__) 

237 

238 dist = '' 

239 for k, v in Counter(y_out).items(): 

240 per = v / len(y_out) * 100 

241 rng = '' 

242 if ranges: 

243 rng = str(ranges[k]) 

244 dist = dist + 'Class=%d, n=%d (%.3f%%) %s\n' % (k, v, per, rng) 

245 fu.log('Classes distribution after oversampling:\n\n%s' % dist, self.out_log, self.global_log) 

246 

247 # evaluate oversampling 

248 if self.evaluate: 

249 fu.log('Evaluating data after oversampling with RandomForestClassifier', self.out_log, self.global_log) 

250 cv = RepeatedStratifiedKFold(n_splits=self.evaluate_splits, n_repeats=self.evaluate_repeats, random_state=self.random_state_evaluate) 

251 # evaluate model 

252 scores = cross_val_score(RandomForestClassifier(), final_X, y_out, scoring='accuracy', cv=cv, n_jobs=-1) 

253 if not np.isnan(np.mean(scores)): 

254 fu.log('Mean Accuracy after oversampling a %s dataset with %s method: %.3f' % (self.type, oversampling_methods[self.method]['method'], np.mean(scores)), self.out_log, self.global_log) 

255 else: 

256 fu.log('Unable to calculate cross validation score, NaN was returned.', self.out_log, self.global_log) 

257 

258 # save output 

259 hdr = False 

260 if header == 0: 

261 hdr = True 

262 fu.log('Saving oversampled dataset to %s' % self.io_dict["out"]["output_dataset_path"], self.out_log, self.global_log) 

263 out_df.to_csv(self.io_dict["out"]["output_dataset_path"], index=False, header=hdr) 

264 

265 # Copy files to host 

266 self.copy_to_host() 

267 

268 self.tmp_files.extend([ 

269 self.stage_io_dict.get("unique_dir") 

270 ]) 

271 self.remove_tmp_files() 

272 

273 self.check_arguments(output_files_created=True, raise_exception=False) 

274 

275 return 0 

276 

277 

278def oversampling(input_dataset_path: str, output_dataset_path: str, properties: dict = None, **kwargs) -> int: 

279 """Execute the :class:`Oversampling <resampling.oversampling.Oversampling>` class and 

280 execute the :meth:`launch() <resampling.oversampling.Oversampling.launch>` method.""" 

281 

282 return Oversampling(input_dataset_path=input_dataset_path, 

283 output_dataset_path=output_dataset_path, 

284 properties=properties, **kwargs).launch() 

285 

286 

287def main(): 

288 """Command line execution of this building block. Please check the command line documentation.""" 

289 parser = argparse.ArgumentParser(description="Wrapper of most of the imblearn.over_sampling methods.", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

290 parser.add_argument('--config', required=False, help='Configuration file') 

291 

292 # Specific args of each building block 

293 required_args = parser.add_argument_group('required arguments') 

294 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

295 required_args.add_argument('--output_dataset_path', required=True, help='Path to the output dataset. Accepted formats: csv.') 

296 

297 args = parser.parse_args() 

298 args.config = args.config or "{}" 

299 properties = settings.ConfReader(config=args.config).get_prop_dic() 

300 

301 # Specific call of each building block 

302 oversampling(input_dataset_path=args.input_dataset_path, 

303 output_dataset_path=args.output_dataset_path, 

304 properties=properties) 

305 

306 

307if __name__ == '__main__': 

308 main()