Coverage for biobb_ml/clustering/agglomerative_clustering.py: 83%

89 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-03 14:57 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the AgglClustering class and the command line interface.""" 

4import argparse 

5import pandas as pd 

6from biobb_common.generic.biobb_object import BiobbObject 

7from sklearn.preprocessing import StandardScaler 

8from sklearn.cluster import AgglomerativeClustering 

9from biobb_common.configuration import settings 

10from biobb_common.tools import file_utils as fu 

11from biobb_common.tools.file_utils import launchlogger 

12from biobb_ml.clustering.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, hopkins, plotCluster 

13 

14 

15class AgglClustering(BiobbObject): 

16 """ 

17 | biobb_ml AgglClustering 

18 | Wrapper of the scikit-learn AgglomerativeClustering method. 

19 | Clusters a given dataset. Visit the `AgglomerativeClustering documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.cluster.AgglomerativeClustering.html>`_ in the sklearn official website for further information. 

20 

21 Args: 

22 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/clustering/dataset_agglomerative_clustering.csv>`_. Accepted formats: csv (edam:format_3752). 

23 output_results_path (str): Path to the clustered dataset. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_results_agglomerative_clustering.csv>`_. Accepted formats: csv (edam:format_3752). 

24 output_plot_path (str) (Optional): Path to the clustering plot. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_plot_agglomerative_clustering.png>`_. Accepted formats: png (edam:format_3603). 

25 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

26 * **predictors** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of multiple formats, the first one will be picked. 

27 * **clusters** (*int*) - (3) [1~100|1] The number of clusters to form as well as the number of centroids to generate. 

28 * **affinity** (*str*) - ("euclidean") Metric used to compute the linkage. If linkage is "ward", only "euclidean" is accepted. Values: euclidean (Computes the Euclidean distance between two 1-D arrays), l1, l2, manhattan (Compute the Manhattan distance), cosine (Compute the Cosine distance between 1-D arrays), precomputed (means that the flatten array containing the upper triangular of the distance matrix of the original data is used). 

29 * **linkage** (*str*) - ("ward") The linkage criterion determines which distance to use between sets of observation. The algorithm will merge the pairs of cluster that minimize this criterion. Values: ward (minimizes the variance of the clusters being merged), complete (uses the maximum distances between all observations of the two sets), average (uses the average of the distances of each observation of the two sets), single (uses the minimum of the distances between all observations of the two sets). 

30 * **plots** (*list*) - (None) List of dictionaries with all plots you want to generate. Only 2D or 3D plots accepted. Format: [ { 'title': 'Plot 1', 'features': ['feat1', 'feat2'] } ]. 

31 * **scale** (*bool*) - (False) Whether or not to scale the input dataset. 

32 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

33 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

34 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

35 

36 Examples: 

37 This is a use example of how to use the building block from Python:: 

38 

39 from biobb_ml.clustering.agglomerative_clustering import agglomerative_clustering 

40 prop = { 

41 'predictors': { 

42 'columns': [ 'column1', 'column2', 'column3' ] 

43 }, 

44 'clusters': 3, 

45 'affinity': 'euclidean', 

46 'linkage': 'ward', 

47 'plots': [ 

48 { 

49 'title': 'Plot 1', 

50 'features': ['feat1', 'feat2'] 

51 } 

52 ] 

53 } 

54 agglomerative_clustering(input_dataset_path='/path/to/myDataset.csv', 

55 output_results_path='/path/to/newTable.csv', 

56 output_plot_path='/path/to/newPlot.png', 

57 properties=prop) 

58 

59 Info: 

60 * wrapped_software: 

61 * name: scikit-learn AgglomerativeClustering 

62 * version: >=0.24.2 

63 * license: BSD 3-Clause 

64 * ontology: 

65 * name: EDAM 

66 * schema: http://edamontology.org/EDAM.owl 

67 

68 """ 

69 

70 def __init__(self, input_dataset_path, output_results_path, 

71 output_plot_path=None, properties=None, **kwargs) -> None: 

72 properties = properties or {} 

73 

74 # Call parent class constructor 

75 super().__init__(properties) 

76 self.locals_var_dict = locals().copy() 

77 

78 # Input/Output files 

79 self.io_dict = { 

80 "in": {"input_dataset_path": input_dataset_path}, 

81 "out": {"output_results_path": output_results_path, "output_plot_path": output_plot_path} 

82 } 

83 

84 # Properties specific for BB 

85 self.predictors = properties.get('predictors', {}) 

86 self.clusters = properties.get('clusters', 3) 

87 self.affinity = properties.get('affinity', 'euclidean') 

88 self.linkage = properties.get('linkage', 'ward') 

89 self.plots = properties.get('plots', []) 

90 self.scale = properties.get('scale', False) 

91 self.properties = properties 

92 

93 # Check the properties 

94 self.check_properties(properties) 

95 self.check_arguments() 

96 

97 def check_data_params(self, out_log, err_log): 

98 """ Checks all the input/output paths and parameters """ 

99 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

100 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__) 

101 if self.io_dict["out"]["output_plot_path"]: 

102 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__) 

103 

104 @launchlogger 

105 def launch(self) -> int: 

106 """Execute the :class:`AgglClustering <clustering.agglomerative_clustering.AgglClustering>` clustering.agglomerative_clustering.AgglClustering object.""" 

107 

108 # check input/output paths and parameters 

109 self.check_data_params(self.out_log, self.err_log) 

110 

111 # Setup Biobb 

112 if self.check_restart(): 

113 return 0 

114 self.stage_files() 

115 

116 # load dataset 

117 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

118 if 'columns' in self.predictors: 

119 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

120 skiprows = 1 

121 else: 

122 labels = None 

123 skiprows = None 

124 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

125 

126 # the features are the predictors 

127 predictors = getIndependentVars(self.predictors, data, self.out_log, self.__class__.__name__) 

128 fu.log('Predictors: [%s]' % (getIndependentVarsList(self.predictors)), self.out_log, self.global_log) 

129 

130 # Hopkins test 

131 H = hopkins(predictors) 

132 fu.log('Performing Hopkins test over dataset. H = %f' % H, self.out_log, self.global_log) 

133 

134 # scale dataset 

135 if self.scale: 

136 fu.log('Scaling dataset', self.out_log, self.global_log) 

137 scaler = StandardScaler() 

138 predictors = scaler.fit_transform(predictors) 

139 

140 # create an agglomerative clustering object with self.clusters clusters 

141 model = AgglomerativeClustering(n_clusters=self.clusters, affinity=self.affinity, linkage=self.linkage) 

142 # fit the data 

143 model.fit(predictors) 

144 

145 # create a copy of data, so we can see the clusters next to the original data 

146 clusters = data.copy() 

147 # predict the cluster for each observation 

148 clusters['cluster'] = model.fit_predict(predictors) 

149 

150 fu.log('Calculating results\n\nCLUSTERING TABLE\n\n%s\n' % clusters, self.out_log, self.global_log) 

151 

152 # save results 

153 fu.log('Saving results to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log) 

154 clusters.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True, float_format='%.3f') 

155 

156 if self.io_dict["out"]["output_plot_path"] and self.plots: 

157 new_plots = [] 

158 i = 0 

159 for plot in self.plots: 

160 if len(plot['features']) == 2 or len(plot['features']) == 3: 

161 new_plots.append(plot) 

162 i += 1 

163 if i == 6: 

164 break 

165 

166 plot = plotCluster(new_plots, clusters) 

167 fu.log('Saving output plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

168 plot.savefig(self.io_dict["out"]["output_plot_path"], dpi=150) 

169 

170 # Copy files to host 

171 self.copy_to_host() 

172 

173 self.tmp_files.extend([ 

174 self.stage_io_dict.get("unique_dir") 

175 ]) 

176 self.remove_tmp_files() 

177 

178 self.check_arguments(output_files_created=True, raise_exception=False) 

179 

180 return 0 

181 

182 

183def agglomerative_clustering(input_dataset_path: str, output_results_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int: 

184 """Execute the :class:`AgglClustering <clustering.agglomerative_clustering.AgglClustering>` class and 

185 execute the :meth:`launch() <clustering.agglomerative_clustering.AgglClustering.launch>` method.""" 

186 

187 return AgglClustering(input_dataset_path=input_dataset_path, 

188 output_results_path=output_results_path, 

189 output_plot_path=output_plot_path, 

190 properties=properties, **kwargs).launch() 

191 

192 

193def main(): 

194 """Command line execution of this building block. Please check the command line documentation.""" 

195 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn AgglomerativeClustering method. ", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

196 parser.add_argument('--config', required=False, help='Configuration file') 

197 

198 # Specific args of each building block 

199 required_args = parser.add_argument_group('required arguments') 

200 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

201 required_args.add_argument('--output_results_path', required=True, help='Path to the clustered dataset. Accepted formats: csv.') 

202 parser.add_argument('--output_plot_path', required=False, help='Path to the clustering plot. Accepted formats: png.') 

203 

204 args = parser.parse_args() 

205 args.config = args.config or "{}" 

206 properties = settings.ConfReader(config=args.config).get_prop_dic() 

207 

208 # Specific call of each building block 

209 agglomerative_clustering(input_dataset_path=args.input_dataset_path, 

210 output_results_path=args.output_results_path, 

211 output_plot_path=args.output_plot_path, 

212 properties=properties) 

213 

214 

215if __name__ == '__main__': 

216 main()