Coverage for biobb_ml/clustering/agglomerative_clustering.py: 83%

90 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-07 09:39 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the AgglClustering class and the command line interface.""" 

4import argparse 

5import pandas as pd 

6from biobb_common.generic.biobb_object import BiobbObject 

7from sklearn.preprocessing import StandardScaler 

8from sklearn.cluster import AgglomerativeClustering 

9from biobb_common.configuration import settings 

10from biobb_common.tools import file_utils as fu 

11from biobb_common.tools.file_utils import launchlogger 

12from biobb_ml.clustering.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, hopkins, plotCluster 

13 

14 

15class AgglClustering(BiobbObject): 

16 """ 

17 | biobb_ml AgglClustering 

18 | Wrapper of the scikit-learn AgglomerativeClustering method. 

19 | Clusters a given dataset. Visit the `AgglomerativeClustering documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.cluster.AgglomerativeClustering.html>`_ in the sklearn official website for further information. 

20 

21 Args: 

22 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/clustering/dataset_agglomerative_clustering.csv>`_. Accepted formats: csv (edam:format_3752). 

23 output_results_path (str): Path to the clustered dataset. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_results_agglomerative_clustering.csv>`_. Accepted formats: csv (edam:format_3752). 

24 output_plot_path (str) (Optional): Path to the clustering plot. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_plot_agglomerative_clustering.png>`_. Accepted formats: png (edam:format_3603). 

25 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

26 * **predictors** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of multiple formats, the first one will be picked. 

27 * **clusters** (*int*) - (3) [1~100|1] The number of clusters to form as well as the number of centroids to generate. 

28 * **affinity** (*str*) - ("euclidean") Metric used to compute the linkage. If linkage is "ward", only "euclidean" is accepted. Values: euclidean (Computes the Euclidean distance between two 1-D arrays), l1, l2, manhattan (Compute the Manhattan distance), cosine (Compute the Cosine distance between 1-D arrays), precomputed (means that the flatten array containing the upper triangular of the distance matrix of the original data is used). 

29 * **linkage** (*str*) - ("ward") The linkage criterion determines which distance to use between sets of observation. The algorithm will merge the pairs of cluster that minimize this criterion. Values: ward (minimizes the variance of the clusters being merged), complete (uses the maximum distances between all observations of the two sets), average (uses the average of the distances of each observation of the two sets), single (uses the minimum of the distances between all observations of the two sets). 

30 * **plots** (*list*) - (None) List of dictionaries with all plots you want to generate. Only 2D or 3D plots accepted. Format: [ { 'title': 'Plot 1', 'features': ['feat1', 'feat2'] } ]. 

31 * **scale** (*bool*) - (False) Whether or not to scale the input dataset. 

32 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

33 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

34 

35 Examples: 

36 This is a use example of how to use the building block from Python:: 

37 

38 from biobb_ml.clustering.agglomerative_clustering import agglomerative_clustering 

39 prop = { 

40 'predictors': { 

41 'columns': [ 'column1', 'column2', 'column3' ] 

42 }, 

43 'clusters': 3, 

44 'affinity': 'euclidean', 

45 'linkage': 'ward', 

46 'plots': [ 

47 { 

48 'title': 'Plot 1', 

49 'features': ['feat1', 'feat2'] 

50 } 

51 ] 

52 } 

53 agglomerative_clustering(input_dataset_path='/path/to/myDataset.csv', 

54 output_results_path='/path/to/newTable.csv', 

55 output_plot_path='/path/to/newPlot.png', 

56 properties=prop) 

57 

58 Info: 

59 * wrapped_software: 

60 * name: scikit-learn AgglomerativeClustering 

61 * version: >=0.24.2 

62 * license: BSD 3-Clause 

63 * ontology: 

64 * name: EDAM 

65 * schema: http://edamontology.org/EDAM.owl 

66 

67 """ 

68 

69 def __init__(self, input_dataset_path, output_results_path, 

70 output_plot_path=None, properties=None, **kwargs) -> None: 

71 properties = properties or {} 

72 

73 # Call parent class constructor 

74 super().__init__(properties) 

75 self.locals_var_dict = locals().copy() 

76 

77 # Input/Output files 

78 self.io_dict = { 

79 "in": {"input_dataset_path": input_dataset_path}, 

80 "out": {"output_results_path": output_results_path, "output_plot_path": output_plot_path} 

81 } 

82 

83 # Properties specific for BB 

84 self.predictors = properties.get('predictors', {}) 

85 self.clusters = properties.get('clusters', 3) 

86 self.affinity = properties.get('affinity', 'euclidean') 

87 self.linkage = properties.get('linkage', 'ward') 

88 self.plots = properties.get('plots', []) 

89 self.scale = properties.get('scale', False) 

90 self.properties = properties 

91 

92 # Check the properties 

93 self.check_properties(properties) 

94 self.check_arguments() 

95 

96 def check_data_params(self, out_log, err_log): 

97 """ Checks all the input/output paths and parameters """ 

98 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__) 

99 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__) 

100 if self.io_dict["out"]["output_plot_path"]: 

101 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__) 

102 

103 @launchlogger 

104 def launch(self) -> int: 

105 """Execute the :class:`AgglClustering <clustering.agglomerative_clustering.AgglClustering>` clustering.agglomerative_clustering.AgglClustering object.""" 

106 

107 # check input/output paths and parameters 

108 self.check_data_params(self.out_log, self.err_log) 

109 

110 # Setup Biobb 

111 if self.check_restart(): 

112 return 0 

113 self.stage_files() 

114 

115 # load dataset 

116 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log) 

117 if 'columns' in self.predictors: 

118 labels = getHeader(self.io_dict["in"]["input_dataset_path"]) 

119 skiprows = 1 

120 else: 

121 labels = None 

122 skiprows = None 

123 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels) 

124 

125 # the features are the predictors 

126 predictors = getIndependentVars(self.predictors, data, self.out_log, self.__class__.__name__) 

127 fu.log('Predictors: [%s]' % (getIndependentVarsList(self.predictors)), self.out_log, self.global_log) 

128 

129 # Hopkins test 

130 H = hopkins(predictors) 

131 fu.log('Performing Hopkins test over dataset. H = %f' % H, self.out_log, self.global_log) 

132 

133 # scale dataset 

134 if self.scale: 

135 fu.log('Scaling dataset', self.out_log, self.global_log) 

136 scaler = StandardScaler() 

137 predictors = scaler.fit_transform(predictors) 

138 

139 # create an agglomerative clustering object with self.clusters clusters 

140 model = AgglomerativeClustering(n_clusters=self.clusters, affinity=self.affinity, linkage=self.linkage) 

141 # fit the data 

142 model.fit(predictors) 

143 

144 # create a copy of data, so we can see the clusters next to the original data 

145 clusters = data.copy() 

146 # predict the cluster for each observation 

147 clusters['cluster'] = model.fit_predict(predictors) 

148 

149 fu.log('Calculating results\n\nCLUSTERING TABLE\n\n%s\n' % clusters, self.out_log, self.global_log) 

150 

151 # save results 

152 fu.log('Saving results to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log) 

153 clusters.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True, float_format='%.3f') 

154 

155 if self.io_dict["out"]["output_plot_path"] and self.plots: 

156 new_plots = [] 

157 i = 0 

158 for plot in self.plots: 

159 if len(plot['features']) == 2 or len(plot['features']) == 3: 

160 new_plots.append(plot) 

161 i += 1 

162 if i == 6: 

163 break 

164 

165 plot = plotCluster(new_plots, clusters) 

166 fu.log('Saving output plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log) 

167 plot.savefig(self.io_dict["out"]["output_plot_path"], dpi=150) 

168 

169 # Copy files to host 

170 self.copy_to_host() 

171 

172 self.tmp_files.extend([ 

173 self.stage_io_dict.get("unique_dir") 

174 ]) 

175 self.remove_tmp_files() 

176 

177 self.check_arguments(output_files_created=True, raise_exception=False) 

178 

179 return 0 

180 

181 

182def agglomerative_clustering(input_dataset_path: str, output_results_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int: 

183 """Execute the :class:`AgglClustering <clustering.agglomerative_clustering.AgglClustering>` class and 

184 execute the :meth:`launch() <clustering.agglomerative_clustering.AgglClustering.launch>` method.""" 

185 

186 return AgglClustering(input_dataset_path=input_dataset_path, 

187 output_results_path=output_results_path, 

188 output_plot_path=output_plot_path, 

189 properties=properties, **kwargs).launch() 

190 

191 

192def main(): 

193 """Command line execution of this building block. Please check the command line documentation.""" 

194 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn AgglomerativeClustering method. ", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

195 parser.add_argument('--config', required=False, help='Configuration file') 

196 

197 # Specific args of each building block 

198 required_args = parser.add_argument_group('required arguments') 

199 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.') 

200 required_args.add_argument('--output_results_path', required=True, help='Path to the clustered dataset. Accepted formats: csv.') 

201 parser.add_argument('--output_plot_path', required=False, help='Path to the clustering plot. Accepted formats: png.') 

202 

203 args = parser.parse_args() 

204 args.config = args.config or "{}" 

205 properties = settings.ConfReader(config=args.config).get_prop_dic() 

206 

207 # Specific call of each building block 

208 agglomerative_clustering(input_dataset_path=args.input_dataset_path, 

209 output_results_path=args.output_results_path, 

210 output_plot_path=args.output_plot_path, 

211 properties=properties) 

212 

213 

214if __name__ == '__main__': 

215 main()