Coverage for biobb_ml/clustering/agglomerative_coefficient.py: 83%

84 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:39 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the AgglomerativeCoefficient class and the command line interface.""" 

4import argparse 

5import pandas as pd 

6import numpy as np 

7from biobb_common.generic.biobb_object import BiobbObject 

8from sklearn.preprocessing import StandardScaler 

9from biobb_common.configuration import settings 

10from biobb_common.tools import file_utils as fu 

11from biobb_common.tools.file_utils import launchlogger 

12from biobb_ml.clustering.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, hopkins, getSilhouetthe, plotAgglomerativeTrain 

13 

14 

15class AgglomerativeCoefficient(BiobbObject): 

16 """ 

17 | biobb_ml AgglomerativeCoefficient 

18 | Wrapper of the scikit-learn AgglomerativeClustering method. 

19 | Clusters a given dataset and calculates best K coefficient. Visit the `AgglomerativeClustering documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.cluster.AgglomerativeClustering.html>`_ in the sklearn official website for further information. 

20 

21 Args: 

22 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/clustering/dataset_agglomerative_coefficient.csv>`_. Accepted formats: csv (edam:format_3752). 

23 output_results_path (str): Path to the gap values list. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_results_agglomerative_coefficient.csv>`_. Accepted formats: csv (edam:format_3752). 

24 output_plot_path (str) (Optional): Path to the elbow method and gap statistics plot. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_plot_agglomerative_coefficient.png>`_. Accepted formats: png (edam:format_3603). 

25 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

26 * **predictors** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked. 

27 * **max_clusters** (*int*) - (6) [1~100|1] Maximum number of clusters to use by default for kmeans queries. 

28 * **affinity** (*str*) - ("euclidean") Metric used to compute the linkage. If linkage is "ward", only "euclidean" is accepted. Values: euclidean (Computes the Euclidean distance between two 1-D arrays), l1, l2, manhattan (Compute the Manhattan distance), cosine (Compute the Cosine distance between 1-D arrays), precomputed (means that the flatten array containing the upper triangular of the distance matrix of the original data is used). 

29 * **linkage** (*str*) - ("ward") The linkage criterion determines which distance to use between sets of observation. The algorithm will merge the pairs of cluster that minimize this criterion. Values: ward (minimizes the variance of the clusters being merged), complete (uses the maximum distances between all observations of the two sets), average (uses the average of the distances of each observation of the two sets), single (uses the minimum of the distances between all observations of the two sets). 

30 * **scale** (*bool*) - (False) Whether or not to scale the input dataset. 

31 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

32 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

33 

34 Examples: 

35 This is a use example of how to use the building block from Python:: 

36 

37 from biobb_ml.clustering.agglomerative_coefficient import agglomerative_coefficient 

38 prop = { 

39 'predictors': { 

40 'columns': [ 'column1', 'column2', 'column3' ] 

41 }, 

42 'clusters': 3, 

43 'affinity': 'euclidean', 

44 'linkage': 'ward', 

45 'plots': [ 

46 { 

47 'title': 'Plot 1', 

48 'features': ['feat1', 'feat2'] 

49 } 

50 ] 

51 } 

52 agglomerative_coefficient(input_dataset_path='/path/to/myDataset.csv', 

53 output_results_path='/path/to/newTable.csv', 

54 output_plot_path='/path/to/newPlot.png', 

55 properties=prop) 

56 

57 Info: 

58 * wrapped_software: 

59 * name: scikit-learn AgglomerativeClustering 

60 * version: >=0.24.2 

61 * license: BSD 3-Clause 

62 * ontology: 

63 * name: EDAM 

64 * schema: http://edamontology.org/EDAM.owl# 

65 

66 """ 

67 

68 def __init__(self, input_dataset_path, output_results_path, 

69 output_plot_path=None, properties=None, **kwargs) -> None: 

70 properties = properties or {} 

71 

72 # Call parent class constructor 

73 super().__init__(properties) 

74 self.locals_var_dict = locals().copy() 

75 

76 # Input/Output files 

77 self.io_dict = { 

78 "in": {"input_dataset_path": input_dataset_path}, 

79 "out": {"output_results_path": output_results_path, "output_plot_path": output_plot_path} 

80 } 

81 

82 # Properties specific for BB 

83 self.predictors = properties.get('predictors', {}) 

84 self.max_clusters = properties.get('max_clusters', 6) 

85 self.affinity = properties.get('affinity', 'euclidean') 

86 self.linkage = properties.get('linkage', 'ward') 

87 self.scale = properties.get('scale', False) 

88 self.properties = properties 

89 

90 # Check the properties 

91 self.check_properties(properties) 

92 self.check_arguments() 

93 

94 def check_data_params(self, out_log, err_log): 

95 """ Checks all the input/output paths and parameters """ 

96 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

97 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__) 

98 if self.io_dict["out"]["output_plot_path"]: 

99 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__) 

100 

101 @launchlogger 

102 def launch(self) -> int: 

103 """Execute the :class:`AgglomerativeCoefficient <clustering.agglomerative_coefficient.AgglomerativeCoefficient>` clustering.agglomerative_coefficient.AgglomerativeCoefficient object.""" 

104 

105 # check input/output paths and parameters 

106 self.check_data_params(self.out_log, self.err_log) 

107 

108 # Setup Biobb 

109 if self.check_restart(): 

110 return 0 

111 self.stage_files() 

112 

113 # load dataset 

114 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

115 if 'columns' in self.predictors: 

116 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

117 skiprows = 1 

118 else: 

119 labels = None 

120 skiprows = None 

121 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

122 

123 # the features are the predictors 

124 predictors = getIndependentVars(self.predictors, data, self.out_log, self.__class__.__name__) 

125 fu.log('Predictors: [%s]' % (getIndependentVarsList(self.predictors)), self.out_log, self.global_log) 

126 

127 # Hopkins test 

128 H = hopkins(predictors) 

129 fu.log('Performing Hopkins test over dataset. H = %f' % H, self.out_log, self.global_log) 

130 

131 # scale dataset 

132 if self.scale: 

133 fu.log('Scaling dataset', self.out_log, self.global_log) 

134 scaler = StandardScaler() 

135 predictors = scaler.fit_transform(predictors) 

136 

137 # calculate silhouette 

138 silhouette_list, s_list = getSilhouetthe(method='agglomerative', X=predictors, max_clusters=self.max_clusters, affinity=self.affinity, linkage=self.linkage) 

139 

140 # silhouette table 

141 silhouette_table = pd.DataFrame(data={'cluster': np.arange(1, self.max_clusters + 1), 'SILHOUETTE': silhouette_list}) 

142 fu.log('Calculating Silhouette for each cluster\n\nSILHOUETTE TABLE\n\n%s\n' % silhouette_table.to_string(index=False), self.out_log, self.global_log) 

143 

144 # get best cluster silhouette method 

145 key = silhouette_list.index(max(silhouette_list)) 

146 best_s = s_list.__getitem__(key) 

147 fu.log('Optimal number of clusters according to the Silhouette Method is %d' % best_s, self.out_log, self.global_log) 

148 

149 # save results table 

150 results_table = pd.DataFrame(data={'method': ['silhouette'], 'coefficient': [max(silhouette_list)], 'cluster': [best_s]}) 

151 fu.log('Gathering results\n\nRESULTS TABLE\n\n%s\n' % results_table.to_string(index=False), self.out_log, self.global_log) 

152 fu.log('Saving results to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log) 

153 results_table.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True, float_format='%.3f') 

154 

155 # wcss plot 

156 if self.io_dict["out"]["output_plot_path"]: 

157 fu.log('Saving methods plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

158 plot = plotAgglomerativeTrain(self.max_clusters, silhouette_list, best_s) 

159 plot.savefig(self.io_dict["out"]["output_plot_path"], dpi=150) 

160 

161 # Copy files to host 

162 self.copy_to_host() 

163 

164 self.tmp_files.extend([ 

165 self.stage_io_dict.get("unique_dir") 

166 ]) 

167 self.remove_tmp_files() 

168 

169 self.check_arguments(output_files_created=True, raise_exception=False) 

170 

171 return 0 

172 

173 

174def agglomerative_coefficient(input_dataset_path: str, output_results_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int: 

175 """Execute the :class:`AgglomerativeCoefficient <clustering.agglomerative_coefficient.AgglomerativeCoefficient>` class and 

176 execute the :meth:`launch() <clustering.agglomerative_coefficient.AgglomerativeCoefficient.launch>` method.""" 

177 

178 return AgglomerativeCoefficient(input_dataset_path=input_dataset_path, 

179 output_results_path=output_results_path, 

180 output_plot_path=output_plot_path, 

181 properties=properties, **kwargs).launch() 

182 

183 

184def main(): 

185 """Command line execution of this building block. Please check the command line documentation.""" 

186 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn AgglomerativeCoefficient method. ", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

187 parser.add_argument('--config', required=False, help='Configuration file') 

188 

189 # Specific args of each building block 

190 required_args = parser.add_argument_group('required arguments') 

191 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

192 required_args.add_argument('--output_results_path', required=True, help='Path to the gap values list. Accepted formats: csv.') 

193 parser.add_argument('--output_plot_path', required=False, help='Path to the elbow and gap methods plot. Accepted formats: png.') 

194 

195 args = parser.parse_args() 

196 args.config = args.config or "{}" 

197 properties = settings.ConfReader(config=args.config).get_prop_dic() 

198 

199 # Specific call of each building block 

200 agglomerative_coefficient(input_dataset_path=args.input_dataset_path, 

201 output_results_path=args.output_results_path, 

202 output_plot_path=args.output_plot_path, 

203 properties=properties) 

204 

205 

206if __name__ == '__main__': 

207 main()