Coverage for biobb_ml/clustering/k_means_coefficient.py: 85%

92 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-03 14:57 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the KMeansCoefficient class and the command line interface.""" 

4import argparse 

5import pandas as pd 

6import numpy as np 

7from biobb_common.generic.biobb_object import BiobbObject 

8from sklearn.preprocessing import StandardScaler 

9from biobb_common.configuration import settings 

10from biobb_common.tools import file_utils as fu 

11from biobb_common.tools.file_utils import launchlogger 

12from biobb_ml.clustering.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, hopkins, getWCSS, get_best_K, getGap, getSilhouetthe, plotKmeansTrain 

13 

14 

15class KMeansCoefficient(BiobbObject): 

16 """ 

17 | biobb_ml KMeansCoefficient 

18 | Wrapper of the scikit-learn KMeans method. 

19 | Clusters a given dataset and calculates best K coefficient. Visit the `KMeans documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html>`_ in the sklearn official website for further information. 

20 

21 Args: 

22 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/clustering/dataset_k_means_coefficient.csv>`_. Accepted formats: csv (edam:format_3752). 

23 output_results_path (str): Table with WCSS (elbow method), Gap and Silhouette coefficients for each cluster. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_results_k_means_coefficient.csv>`_. Accepted formats: csv (edam:format_3752). 

24 output_plot_path (str) (Optional): Path to the elbow method and gap statistics plot. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_plot_k_means_coefficient.png>`_. Accepted formats: png (edam:format_3603). 

25 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

26 * **predictors** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked. 

27 * **max_clusters** (*int*) - (6) [1~100|1] Maximum number of clusters to use by default for kmeans queries. 

28 * **random_state_method** (*int*) - (5) [1~1000|1] Determines random number generation for centroid initialization. 

29 * **scale** (*bool*) - (False) Whether or not to scale the input dataset. 

30 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

31 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

32 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

33 

34 Examples: 

35 This is a use example of how to use the building block from Python:: 

36 

37 from biobb_ml.clustering.k_means_coefficient import k_means_coefficient 

38 prop = { 

39 'predictors': { 

40 'columns': [ 'column1', 'column2', 'column3' ] 

41 }, 

42 'max_clusters': 3 

43 } 

44 k_means_coefficient(input_dataset_path='/path/to/myDataset.csv', 

45 output_results_path='/path/to/newTable.csv', 

46 output_plot_path='/path/to/newPlot.png', 

47 properties=prop) 

48 

49 Info: 

50 * wrapped_software: 

51 * name: scikit-learn KMeans 

52 * version: >=0.24.2 

53 * license: BSD 3-Clause 

54 * ontology: 

55 * name: EDAM 

56 * schema: http://edamontology.org/EDAM.owl 

57 

58 """ 

59 

60 def __init__(self, input_dataset_path, output_results_path, 

61 output_plot_path=None, properties=None, **kwargs) -> None: 

62 properties = properties or {} 

63 

64 # Call parent class constructor 

65 super().__init__(properties) 

66 self.locals_var_dict = locals().copy() 

67 

68 # Input/Output files 

69 self.io_dict = { 

70 "in": {"input_dataset_path": input_dataset_path}, 

71 "out": {"output_results_path": output_results_path, "output_plot_path": output_plot_path} 

72 } 

73 

74 # Properties specific for BB 

75 self.predictors = properties.get('predictors', {}) 

76 self.max_clusters = properties.get('max_clusters', 6) 

77 self.random_state_method = properties.get('random_state_method', 5) 

78 self.scale = properties.get('scale', False) 

79 self.properties = properties 

80 

81 # Check the properties 

82 self.check_properties(properties) 

83 self.check_arguments() 

84 

85 def check_data_params(self, out_log, err_log): 

86 """ Checks all the input/output paths and parameters """ 

87 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

88 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__) 

89 if self.io_dict["out"]["output_plot_path"]: 

90 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__) 

91 

92 @launchlogger 

93 def launch(self) -> int: 

94 """Execute the :class:`KMeansCoefficient <clustering.k_means_coefficient.KMeansCoefficient>` clustering.k_means_coefficient.KMeansCoefficient object.""" 

95 

96 # check input/output paths and parameters 

97 self.check_data_params(self.out_log, self.err_log) 

98 

99 # Setup Biobb 

100 if self.check_restart(): 

101 return 0 

102 self.stage_files() 

103 

104 # load dataset 

105 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

106 if 'columns' in self.predictors: 

107 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

108 skiprows = 1 

109 else: 

110 labels = None 

111 skiprows = None 

112 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

113 

114 # the features are the predictors 

115 predictors = getIndependentVars(self.predictors, data, self.out_log, self.__class__.__name__) 

116 fu.log('Predictors: [%s]' % (getIndependentVarsList(self.predictors)), self.out_log, self.global_log) 

117 

118 # Hopkins test 

119 H = hopkins(predictors) 

120 fu.log('Performing Hopkins test over dataset. H = %f' % H, self.out_log, self.global_log) 

121 

122 # scale dataset 

123 if self.scale: 

124 fu.log('Scaling dataset', self.out_log, self.global_log) 

125 scaler = StandardScaler() 

126 predictors = scaler.fit_transform(predictors) 

127 

128 # calculate wcss for each cluster 

129 fu.log('Calculating Within-Clusters Sum of Squares (WCSS) for each %d clusters' % self.max_clusters, self.out_log, self.global_log) 

130 wcss = getWCSS('kmeans', self.max_clusters, predictors) 

131 

132 # wcss table 

133 wcss_table = pd.DataFrame(data={'cluster': np.arange(1, self.max_clusters + 1), 'WCSS': wcss}) 

134 fu.log('Calculating WCSS for each cluster\n\nWCSS TABLE\n\n%s\n' % wcss_table.to_string(index=False), self.out_log, self.global_log) 

135 

136 # get best cluster elbow method 

137 best_k, elbow_index = get_best_K(wcss) 

138 fu.log('Optimal number of clusters according to the Elbow Method is %d' % best_k, self.out_log, self.global_log) 

139 

140 # calculate gap 

141 best_g, gap = getGap('kmeans', predictors, nrefs=5, maxClusters=(self.max_clusters + 1)) 

142 

143 # gap table 

144 gap_table = pd.DataFrame(data={'cluster': np.arange(1, self.max_clusters + 1), 'GAP': gap['gap']}) 

145 fu.log('Calculating Gap for each cluster\n\nGAP TABLE\n\n%s\n' % gap_table.to_string(index=False), self.out_log, self.global_log) 

146 

147 # log best cluster gap method 

148 fu.log('Optimal number of clusters according to the Gap Statistics Method is %d' % best_g, self.out_log, self.global_log) 

149 

150 # calculate silhouette 

151 silhouette_list, s_list = getSilhouetthe(method='kmeans', X=predictors, max_clusters=self.max_clusters, random_state=self.random_state_method) 

152 

153 # silhouette table 

154 silhouette_table = pd.DataFrame(data={'cluster': np.arange(1, self.max_clusters + 1), 'SILHOUETTE': silhouette_list}) 

155 fu.log('Calculating Silhouette for each cluster\n\nSILHOUETTE TABLE\n\n%s\n' % silhouette_table.to_string(index=False), self.out_log, self.global_log) 

156 

157 # get best cluster silhouette method 

158 key = silhouette_list.index(max(silhouette_list)) 

159 best_s = s_list.__getitem__(key) 

160 fu.log('Optimal number of clusters according to the Silhouette Method is %d' % best_s, self.out_log, self.global_log) 

161 

162 # save results table 

163 results_table = pd.DataFrame(data={'method': ['elbow', 'gap', 'silhouette'], 'coefficient': [wcss[elbow_index], max(gap['gap']), max(silhouette_list)], 'clusters': [best_k, best_g, best_s]}) 

164 fu.log('Gathering results\n\nRESULTS TABLE\n\n%s\n' % results_table.to_string(index=False), self.out_log, self.global_log) 

165 fu.log('Saving results to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log) 

166 results_table.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True, float_format='%.3f') 

167 

168 # wcss plot 

169 if self.io_dict["out"]["output_plot_path"]: 

170 fu.log('Saving methods plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

171 plot = plotKmeansTrain(self.max_clusters, wcss, gap['gap'], silhouette_list, best_k, best_g, best_s) 

172 plot.savefig(self.io_dict["out"]["output_plot_path"], dpi=150) 

173 

174 # Copy files to host 

175 self.copy_to_host() 

176 

177 self.tmp_files.extend([ 

178 self.stage_io_dict.get("unique_dir") 

179 ]) 

180 self.remove_tmp_files() 

181 

182 self.check_arguments(output_files_created=True, raise_exception=False) 

183 

184 return 0 

185 

186 

187def k_means_coefficient(input_dataset_path: str, output_results_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int: 

188 """Execute the :class:`KMeansCoefficient <clustering.k_means_coefficient.KMeansCoefficient>` class and 

189 execute the :meth:`launch() <clustering.k_means_coefficient.KMeansCoefficient.launch>` method.""" 

190 

191 return KMeansCoefficient(input_dataset_path=input_dataset_path, 

192 output_results_path=output_results_path, 

193 output_plot_path=output_plot_path, 

194 properties=properties, **kwargs).launch() 

195 

196 

197def main(): 

198 """Command line execution of this building block. Please check the command line documentation.""" 

199 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn KMeans method.", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

200 parser.add_argument('--config', required=False, help='Configuration file') 

201 

202 # Specific args of each building block 

203 required_args = parser.add_argument_group('required arguments') 

204 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

205 required_args.add_argument('--output_results_path', required=True, help='Table with WCSS (elbow method), Gap and Silhouette coefficients for each cluster. Accepted formats: csv.') 

206 parser.add_argument('--output_plot_path', required=False, help='Path to the elbow and gap methods plot. Accepted formats: png.') 

207 

208 args = parser.parse_args() 

209 args.config = args.config or "{}" 

210 properties = settings.ConfReader(config=args.config).get_prop_dic() 

211 

212 # Specific call of each building block 

213 k_means_coefficient(input_dataset_path=args.input_dataset_path, 

214 output_results_path=args.output_results_path, 

215 output_plot_path=args.output_plot_path, 

216 properties=properties) 

217 

218 

219if __name__ == '__main__': 

220 main()