Coverage for biobb_ml/clustering/agglomerative_coefficient.py: 83%

83 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-03 14:57 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the AgglomerativeCoefficient class and the command line interface.""" 

4import argparse 

5import pandas as pd 

6import numpy as np 

7from biobb_common.generic.biobb_object import BiobbObject 

8from sklearn.preprocessing import StandardScaler 

9from biobb_common.configuration import settings 

10from biobb_common.tools import file_utils as fu 

11from biobb_common.tools.file_utils import launchlogger 

12from biobb_ml.clustering.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, hopkins, getSilhouetthe, plotAgglomerativeTrain 

13 

14 

15class AgglomerativeCoefficient(BiobbObject): 

16 """ 

17 | biobb_ml AgglomerativeCoefficient 

18 | Wrapper of the scikit-learn AgglomerativeClustering method. 

19 | Clusters a given dataset and calculates best K coefficient. Visit the `AgglomerativeClustering documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.cluster.AgglomerativeClustering.html>`_ in the sklearn official website for further information. 

20 

21 Args: 

22 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/clustering/dataset_agglomerative_coefficient.csv>`_. Accepted formats: csv (edam:format_3752). 

23 output_results_path (str): Path to the gap values list. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_results_agglomerative_coefficient.csv>`_. Accepted formats: csv (edam:format_3752). 

24 output_plot_path (str) (Optional): Path to the elbow method and gap statistics plot. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_plot_agglomerative_coefficient.png>`_. Accepted formats: png (edam:format_3603). 

25 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

26 * **predictors** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked. 

27 * **max_clusters** (*int*) - (6) [1~100|1] Maximum number of clusters to use by default for kmeans queries. 

28 * **affinity** (*str*) - ("euclidean") Metric used to compute the linkage. If linkage is "ward", only "euclidean" is accepted. Values: euclidean (Computes the Euclidean distance between two 1-D arrays), l1, l2, manhattan (Compute the Manhattan distance), cosine (Compute the Cosine distance between 1-D arrays), precomputed (means that the flatten array containing the upper triangular of the distance matrix of the original data is used). 

29 * **linkage** (*str*) - ("ward") The linkage criterion determines which distance to use between sets of observation. The algorithm will merge the pairs of cluster that minimize this criterion. Values: ward (minimizes the variance of the clusters being merged), complete (uses the maximum distances between all observations of the two sets), average (uses the average of the distances of each observation of the two sets), single (uses the minimum of the distances between all observations of the two sets). 

30 * **scale** (*bool*) - (False) Whether or not to scale the input dataset. 

31 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

32 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

33 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

34 

35 Examples: 

36 This is a use example of how to use the building block from Python:: 

37 

38 from biobb_ml.clustering.agglomerative_coefficient import agglomerative_coefficient 

39 prop = { 

40 'predictors': { 

41 'columns': [ 'column1', 'column2', 'column3' ] 

42 }, 

43 'clusters': 3, 

44 'affinity': 'euclidean', 

45 'linkage': 'ward', 

46 'plots': [ 

47 { 

48 'title': 'Plot 1', 

49 'features': ['feat1', 'feat2'] 

50 } 

51 ] 

52 } 

53 agglomerative_coefficient(input_dataset_path='/path/to/myDataset.csv', 

54 output_results_path='/path/to/newTable.csv', 

55 output_plot_path='/path/to/newPlot.png', 

56 properties=prop) 

57 

58 Info: 

59 * wrapped_software: 

60 * name: scikit-learn AgglomerativeClustering 

61 * version: >=0.24.2 

62 * license: BSD 3-Clause 

63 * ontology: 

64 * name: EDAM 

65 * schema: http://edamontology.org/EDAM.owl# 

66 

67 """ 

68 

69 def __init__(self, input_dataset_path, output_results_path, 

70 output_plot_path=None, properties=None, **kwargs) -> None: 

71 properties = properties or {} 

72 

73 # Call parent class constructor 

74 super().__init__(properties) 

75 self.locals_var_dict = locals().copy() 

76 

77 # Input/Output files 

78 self.io_dict = { 

79 "in": {"input_dataset_path": input_dataset_path}, 

80 "out": {"output_results_path": output_results_path, "output_plot_path": output_plot_path} 

81 } 

82 

83 # Properties specific for BB 

84 self.predictors = properties.get('predictors', {}) 

85 self.max_clusters = properties.get('max_clusters', 6) 

86 self.affinity = properties.get('affinity', 'euclidean') 

87 self.linkage = properties.get('linkage', 'ward') 

88 self.scale = properties.get('scale', False) 

89 self.properties = properties 

90 

91 # Check the properties 

92 self.check_properties(properties) 

93 self.check_arguments() 

94 

95 def check_data_params(self, out_log, err_log): 

96 """ Checks all the input/output paths and parameters """ 

97 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

98 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__) 

99 if self.io_dict["out"]["output_plot_path"]: 

100 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__) 

101 

102 @launchlogger 

103 def launch(self) -> int: 

104 """Execute the :class:`AgglomerativeCoefficient <clustering.agglomerative_coefficient.AgglomerativeCoefficient>` clustering.agglomerative_coefficient.AgglomerativeCoefficient object.""" 

105 

106 # check input/output paths and parameters 

107 self.check_data_params(self.out_log, self.err_log) 

108 

109 # Setup Biobb 

110 if self.check_restart(): 

111 return 0 

112 self.stage_files() 

113 

114 # load dataset 

115 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

116 if 'columns' in self.predictors: 

117 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

118 skiprows = 1 

119 else: 

120 labels = None 

121 skiprows = None 

122 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

123 

124 # the features are the predictors 

125 predictors = getIndependentVars(self.predictors, data, self.out_log, self.__class__.__name__) 

126 fu.log('Predictors: [%s]' % (getIndependentVarsList(self.predictors)), self.out_log, self.global_log) 

127 

128 # Hopkins test 

129 H = hopkins(predictors) 

130 fu.log('Performing Hopkins test over dataset. H = %f' % H, self.out_log, self.global_log) 

131 

132 # scale dataset 

133 if self.scale: 

134 fu.log('Scaling dataset', self.out_log, self.global_log) 

135 scaler = StandardScaler() 

136 predictors = scaler.fit_transform(predictors) 

137 

138 # calculate silhouette 

139 silhouette_list, s_list = getSilhouetthe(method='agglomerative', X=predictors, max_clusters=self.max_clusters, affinity=self.affinity, linkage=self.linkage) 

140 

141 # silhouette table 

142 silhouette_table = pd.DataFrame(data={'cluster': np.arange(1, self.max_clusters + 1), 'SILHOUETTE': silhouette_list}) 

143 fu.log('Calculating Silhouette for each cluster\n\nSILHOUETTE TABLE\n\n%s\n' % silhouette_table.to_string(index=False), self.out_log, self.global_log) 

144 

145 # get best cluster silhouette method 

146 key = silhouette_list.index(max(silhouette_list)) 

147 best_s = s_list.__getitem__(key) 

148 fu.log('Optimal number of clusters according to the Silhouette Method is %d' % best_s, self.out_log, self.global_log) 

149 

150 # save results table 

151 results_table = pd.DataFrame(data={'method': ['silhouette'], 'coefficient': [max(silhouette_list)], 'cluster': [best_s]}) 

152 fu.log('Gathering results\n\nRESULTS TABLE\n\n%s\n' % results_table.to_string(index=False), self.out_log, self.global_log) 

153 fu.log('Saving results to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log) 

154 results_table.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True, float_format='%.3f') 

155 

156 # wcss plot 

157 if self.io_dict["out"]["output_plot_path"]: 

158 fu.log('Saving methods plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

159 plot = plotAgglomerativeTrain(self.max_clusters, silhouette_list, best_s) 

160 plot.savefig(self.io_dict["out"]["output_plot_path"], dpi=150) 

161 

162 # Copy files to host 

163 self.copy_to_host() 

164 

165 self.tmp_files.extend([ 

166 self.stage_io_dict.get("unique_dir") 

167 ]) 

168 self.remove_tmp_files() 

169 

170 self.check_arguments(output_files_created=True, raise_exception=False) 

171 

172 return 0 

173 

174 

175def agglomerative_coefficient(input_dataset_path: str, output_results_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int: 

176 """Execute the :class:`AgglomerativeCoefficient <clustering.agglomerative_coefficient.AgglomerativeCoefficient>` class and 

177 execute the :meth:`launch() <clustering.agglomerative_coefficient.AgglomerativeCoefficient.launch>` method.""" 

178 

179 return AgglomerativeCoefficient(input_dataset_path=input_dataset_path, 

180 output_results_path=output_results_path, 

181 output_plot_path=output_plot_path, 

182 properties=properties, **kwargs).launch() 

183 

184 

185def main(): 

186 """Command line execution of this building block. Please check the command line documentation.""" 

187 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn AgglomerativeCoefficient method. ", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

188 parser.add_argument('--config', required=False, help='Configuration file') 

189 

190 # Specific args of each building block 

191 required_args = parser.add_argument_group('required arguments') 

192 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

193 required_args.add_argument('--output_results_path', required=True, help='Path to the gap values list. Accepted formats: csv.') 

194 parser.add_argument('--output_plot_path', required=False, help='Path to the elbow and gap methods plot. Accepted formats: png.') 

195 

196 args = parser.parse_args() 

197 args.config = args.config or "{}" 

198 properties = settings.ConfReader(config=args.config).get_prop_dic() 

199 

200 # Specific call of each building block 

201 agglomerative_coefficient(input_dataset_path=args.input_dataset_path, 

202 output_results_path=args.output_results_path, 

203 output_plot_path=args.output_plot_path, 

204 properties=properties) 

205 

206 

207if __name__ == '__main__': 

208 main()