Coverage for biobb_ml/clustering/k_means.py: 84%

98 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-03 14:57 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the KMeansClustering class and the command line interface.""" 

4import argparse 

5import joblib 

6import pandas as pd 

7from biobb_common.generic.biobb_object import BiobbObject 

8from sklearn.preprocessing import StandardScaler 

9from sklearn.cluster import KMeans 

10from biobb_common.configuration import settings 

11from biobb_common.tools import file_utils as fu 

12from biobb_common.tools.file_utils import launchlogger 

13from biobb_ml.clustering.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, hopkins, plotCluster 

14 

15 

16class KMeansClustering(BiobbObject): 

17 """ 

18 | biobb_ml KMeansClustering 

19 | Wrapper of the scikit-learn KMeans method. 

20 | Clusters a given dataset and saves the model and scaler. Visit the `KMeans documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html>`_ in the sklearn official website for further information. 

21 

22 Args: 

23 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/clustering/dataset_k_means.csv>`_. Accepted formats: csv (edam:format_3752). 

24 output_results_path (str): Path to the clustered dataset. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_results_k_means.csv>`_. Accepted formats: csv (edam:format_3752). 

25 output_model_path (str): Path to the output model file. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_model_k_means.pkl>`_. Accepted formats: pkl (edam:format_3653). 

26 output_plot_path (str) (Optional): Path to the clustering plot. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_plot_k_means.png>`_. Accepted formats: png (edam:format_3603). 

27 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

28 * **predictors** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked. 

29 * **clusters** (*int*) - (3) [1~100|1] The number of clusters to form as well as the number of centroids to generate. 

30 * **plots** (*list*) - (None) List of dictionaries with all plots you want to generate. Only 2D or 3D plots accepted. Format: [ { 'title': 'Plot 1', 'features': ['feat1', 'feat2'] } ]. 

31 * **random_state_method** (*int*) - (5) [1~1000|1] Determines random number generation for centroid initialization. 

32 * **scale** (*bool*) - (False) Whether or not to scale the input dataset. 

33 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

34 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

35 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

36 

37 Examples: 

38 This is a use example of how to use the building block from Python:: 

39 

40 from biobb_ml.clustering.k_means import k_means 

41 prop = { 

42 'predictors': { 

43 'columns': [ 'column1', 'column2', 'column3' ] 

44 }, 

45 'clusters': 3, 

46 'plots': [ 

47 { 

48 'title': 'Plot 1', 

49 'features': ['feat1', 'feat2'] 

50 } 

51 ] 

52 } 

53 k_means(input_dataset_path='/path/to/myDataset.csv', 

54 output_results_path='/path/to/newTable.csv', 

55 output_model_path='/path/to/newModel.pkl', 

56 output_plot_path='/path/to/newPlot.png', 

57 properties=prop) 

58 

59 Info: 

60 * wrapped_software: 

61 * name: scikit-learn KMeans 

62 * version: >=0.24.2 

63 * license: BSD 3-Clause 

64 * ontology: 

65 * name: EDAM 

66 * schema: http://edamontology.org/EDAM.owl 

67 

68 """ 

69 

70 def __init__(self, input_dataset_path, output_results_path, output_model_path, 

71 output_plot_path=None, properties=None, **kwargs) -> None: 

72 properties = properties or {} 

73 

74 # Call parent class constructor 

75 super().__init__(properties) 

76 self.locals_var_dict = locals().copy() 

77 

78 # Input/Output files 

79 self.io_dict = { 

80 "in": {"input_dataset_path": input_dataset_path}, 

81 "out": {"output_results_path": output_results_path, "output_model_path": output_model_path, "output_plot_path": output_plot_path} 

82 } 

83 

84 # Properties specific for BB 

85 self.predictors = properties.get('predictors', {}) 

86 self.clusters = properties.get('clusters', 3) 

87 self.plots = properties.get('plots', []) 

88 self.random_state_method = properties.get('random_state_method', 5) 

89 self.scale = properties.get('scale', False) 

90 self.properties = properties 

91 

92 # Check the properties 

93 self.check_properties(properties) 

94 self.check_arguments() 

95 

96 def check_data_params(self, out_log, err_log): 

97 """ Checks all the input/output paths and parameters """ 

98 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

99 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__) 

100 self.io_dict["out"]["output_model_path"] = check_output_path(self.io_dict["out"]["output_model_path"], "output_model_path", False, out_log, self.__class__.__name__) 

101 if self.io_dict["out"]["output_plot_path"]: 

102 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__) 

103 

104 @launchlogger 

105 def launch(self) -> int: 

106 """Execute the :class:`KMeansClustering <clustering.k_means.KMeansClustering>` clustering.k_means.KMeansClustering object.""" 

107 

108 # check input/output paths and parameters 

109 self.check_data_params(self.out_log, self.err_log) 

110 

111 # Setup Biobb 

112 if self.check_restart(): 

113 return 0 

114 self.stage_files() 

115 

116 # load dataset 

117 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

118 if 'columns' in self.predictors: 

119 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

120 skiprows = 1 

121 else: 

122 labels = None 

123 skiprows = None 

124 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

125 

126 # the features are the predictors 

127 predictors = getIndependentVars(self.predictors, data, self.out_log, self.__class__.__name__) 

128 fu.log('Predictors: [%s]' % (getIndependentVarsList(self.predictors)), self.out_log, self.global_log) 

129 

130 # Hopkins test 

131 H = hopkins(predictors) 

132 fu.log('Performing Hopkins test over dataset. H = %f' % H, self.out_log, self.global_log) 

133 

134 # scale dataset 

135 if self.scale: 

136 fu.log('Scaling dataset', self.out_log, self.global_log) 

137 scaler = StandardScaler() 

138 predictors = scaler.fit_transform(predictors) 

139 

140 # create a k-means object with self.clusters clusters 

141 model = KMeans(n_clusters=self.clusters, random_state=self.random_state_method) 

142 # fit the data 

143 model.fit(predictors) 

144 

145 # create a copy of data, so we can see the clusters next to the original data 

146 clusters = data.copy() 

147 # predict the cluster for each observation 

148 clusters['cluster'] = model.predict(predictors) 

149 

150 fu.log('Calculating results\n\nCLUSTERING TABLE\n\n%s\n' % clusters, self.out_log, self.global_log) 

151 

152 # save results 

153 fu.log('Saving results to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log) 

154 clusters.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True, float_format='%.3f') 

155 

156 if self.io_dict["out"]["output_plot_path"] and self.plots: 

157 new_plots = [] 

158 i = 0 

159 for plot in self.plots: 

160 if len(plot['features']) == 2 or len(plot['features']) == 3: 

161 new_plots.append(plot) 

162 i += 1 

163 if i == 6: 

164 break 

165 

166 plot = plotCluster(new_plots, clusters) 

167 fu.log('Saving output plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

168 plot.savefig(self.io_dict["out"]["output_plot_path"], dpi=150) 

169 

170 # save model, scaler and parameters 

171 variables = { 

172 'predictors': self.predictors, 

173 'scale': self.scale, 

174 } 

175 fu.log('Saving model to %s' % self.io_dict["out"]["output_model_path"], self.out_log, self.global_log) 

176 with open(self.io_dict["out"]["output_model_path"], "wb") as f: 

177 joblib.dump(model, f) 

178 if self.scale: 

179 joblib.dump(scaler, f) 

180 joblib.dump(variables, f) 

181 

182 # Copy files to host 

183 self.copy_to_host() 

184 

185 self.tmp_files.extend([ 

186 self.stage_io_dict.get("unique_dir") 

187 ]) 

188 self.remove_tmp_files() 

189 

190 self.check_arguments(output_files_created=True, raise_exception=False) 

191 

192 return 0 

193 

194 

195def k_means(input_dataset_path: str, output_results_path: str, output_model_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int: 

196 """Execute the :class:`KMeansClustering <clustering.k_means.KMeansClustering>` class and 

197 execute the :meth:`launch() <clustering.k_means.KMeansClustering.launch>` method.""" 

198 

199 return KMeansClustering(input_dataset_path=input_dataset_path, 

200 output_results_path=output_results_path, 

201 output_model_path=output_model_path, 

202 output_plot_path=output_plot_path, 

203 properties=properties, **kwargs).launch() 

204 

205 

206def main(): 

207 """Command line execution of this building block. Please check the command line documentation.""" 

208 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn KMeans method.", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

209 parser.add_argument('--config', required=False, help='Configuration file') 

210 

211 # Specific args of each building block 

212 required_args = parser.add_argument_group('required arguments') 

213 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

214 required_args.add_argument('--output_results_path', required=True, help='Path to the clustered dataset. Accepted formats: csv.') 

215 required_args.add_argument('--output_model_path', required=True, help='Path to the output model file. Accepted formats: pkl.') 

216 parser.add_argument('--output_plot_path', required=False, help='Path to the clustering plot. Accepted formats: png.') 

217 

218 args = parser.parse_args() 

219 args.config = args.config or "{}" 

220 properties = settings.ConfReader(config=args.config).get_prop_dic() 

221 

222 # Specific call of each building block 

223 k_means(input_dataset_path=args.input_dataset_path, 

224 output_results_path=args.output_results_path, 

225 output_model_path=args.output_model_path, 

226 output_plot_path=args.output_plot_path, 

227 properties=properties) 

228 

229 

230if __name__ == '__main__': 

231 main()