Coverage for biobb_ml/clustering/agglomerative_coefficient.py: 83%
84 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-07 09:39 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-07 09:39 +0000
1#!/usr/bin/env python3
3"""Module containing the AgglomerativeCoefficient class and the command line interface."""
4import argparse
5import pandas as pd
6import numpy as np
7from biobb_common.generic.biobb_object import BiobbObject
8from sklearn.preprocessing import StandardScaler
9from biobb_common.configuration import settings
10from biobb_common.tools import file_utils as fu
11from biobb_common.tools.file_utils import launchlogger
12from biobb_ml.clustering.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, hopkins, getSilhouetthe, plotAgglomerativeTrain
15class AgglomerativeCoefficient(BiobbObject):
16 """
17 | biobb_ml AgglomerativeCoefficient
18 | Wrapper of the scikit-learn AgglomerativeClustering method.
19 | Clusters a given dataset and calculates best K coefficient. Visit the `AgglomerativeClustering documentation page <https://scikit-learn.org/stable/modules/generated/sklearn.cluster.AgglomerativeClustering.html>`_ in the sklearn official website for further information.
21 Args:
22 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/clustering/dataset_agglomerative_coefficient.csv>`_. Accepted formats: csv (edam:format_3752).
23 output_results_path (str): Path to the gap values list. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_results_agglomerative_coefficient.csv>`_. Accepted formats: csv (edam:format_3752).
24 output_plot_path (str) (Optional): Path to the elbow method and gap statistics plot. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/clustering/ref_output_plot_agglomerative_coefficient.png>`_. Accepted formats: png (edam:format_3603).
25 properties (dic - Python dictionary object containing the tool parameters, not input/output files):
26 * **predictors** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked.
27 * **max_clusters** (*int*) - (6) [1~100|1] Maximum number of clusters to use by default for kmeans queries.
28 * **affinity** (*str*) - ("euclidean") Metric used to compute the linkage. If linkage is "ward", only "euclidean" is accepted. Values: euclidean (Computes the Euclidean distance between two 1-D arrays), l1, l2, manhattan (Compute the Manhattan distance), cosine (Compute the Cosine distance between 1-D arrays), precomputed (means that the flatten array containing the upper triangular of the distance matrix of the original data is used).
29 * **linkage** (*str*) - ("ward") The linkage criterion determines which distance to use between sets of observation. The algorithm will merge the pairs of cluster that minimize this criterion. Values: ward (minimizes the variance of the clusters being merged), complete (uses the maximum distances between all observations of the two sets), average (uses the average of the distances of each observation of the two sets), single (uses the minimum of the distances between all observations of the two sets).
30 * **scale** (*bool*) - (False) Whether or not to scale the input dataset.
31 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
32 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
34 Examples:
35 This is a use example of how to use the building block from Python::
37 from biobb_ml.clustering.agglomerative_coefficient import agglomerative_coefficient
38 prop = {
39 'predictors': {
40 'columns': [ 'column1', 'column2', 'column3' ]
41 },
42 'clusters': 3,
43 'affinity': 'euclidean',
44 'linkage': 'ward',
45 'plots': [
46 {
47 'title': 'Plot 1',
48 'features': ['feat1', 'feat2']
49 }
50 ]
51 }
52 agglomerative_coefficient(input_dataset_path='/path/to/myDataset.csv',
53 output_results_path='/path/to/newTable.csv',
54 output_plot_path='/path/to/newPlot.png',
55 properties=prop)
57 Info:
58 * wrapped_software:
59 * name: scikit-learn AgglomerativeClustering
60 * version: >=0.24.2
61 * license: BSD 3-Clause
62 * ontology:
63 * name: EDAM
64 * schema: http://edamontology.org/EDAM.owl#
66 """
68 def __init__(self, input_dataset_path, output_results_path,
69 output_plot_path=None, properties=None, **kwargs) -> None:
70 properties = properties or {}
72 # Call parent class constructor
73 super().__init__(properties)
74 self.locals_var_dict = locals().copy()
76 # Input/Output files
77 self.io_dict = {
78 "in": {"input_dataset_path": input_dataset_path},
79 "out": {"output_results_path": output_results_path, "output_plot_path": output_plot_path}
80 }
82 # Properties specific for BB
83 self.predictors = properties.get('predictors', {})
84 self.max_clusters = properties.get('max_clusters', 6)
85 self.affinity = properties.get('affinity', 'euclidean')
86 self.linkage = properties.get('linkage', 'ward')
87 self.scale = properties.get('scale', False)
88 self.properties = properties
90 # Check the properties
91 self.check_properties(properties)
92 self.check_arguments()
94 def check_data_params(self, out_log, err_log):
95 """ Checks all the input/output paths and parameters """
96 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__)
97 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__)
98 if self.io_dict["out"]["output_plot_path"]:
99 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__)
101 @launchlogger
102 def launch(self) -> int:
103 """Execute the :class:`AgglomerativeCoefficient <clustering.agglomerative_coefficient.AgglomerativeCoefficient>` clustering.agglomerative_coefficient.AgglomerativeCoefficient object."""
105 # check input/output paths and parameters
106 self.check_data_params(self.out_log, self.err_log)
108 # Setup Biobb
109 if self.check_restart():
110 return 0
111 self.stage_files()
113 # load dataset
114 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log)
115 if 'columns' in self.predictors:
116 labels = getHeader(self.io_dict["in"]["input_dataset_path"])
117 skiprows = 1
118 else:
119 labels = None
120 skiprows = None
121 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels)
123 # the features are the predictors
124 predictors = getIndependentVars(self.predictors, data, self.out_log, self.__class__.__name__)
125 fu.log('Predictors: [%s]' % (getIndependentVarsList(self.predictors)), self.out_log, self.global_log)
127 # Hopkins test
128 H = hopkins(predictors)
129 fu.log('Performing Hopkins test over dataset. H = %f' % H, self.out_log, self.global_log)
131 # scale dataset
132 if self.scale:
133 fu.log('Scaling dataset', self.out_log, self.global_log)
134 scaler = StandardScaler()
135 predictors = scaler.fit_transform(predictors)
137 # calculate silhouette
138 silhouette_list, s_list = getSilhouetthe(method='agglomerative', X=predictors, max_clusters=self.max_clusters, affinity=self.affinity, linkage=self.linkage)
140 # silhouette table
141 silhouette_table = pd.DataFrame(data={'cluster': np.arange(1, self.max_clusters + 1), 'SILHOUETTE': silhouette_list})
142 fu.log('Calculating Silhouette for each cluster\n\nSILHOUETTE TABLE\n\n%s\n' % silhouette_table.to_string(index=False), self.out_log, self.global_log)
144 # get best cluster silhouette method
145 key = silhouette_list.index(max(silhouette_list))
146 best_s = s_list.__getitem__(key)
147 fu.log('Optimal number of clusters according to the Silhouette Method is %d' % best_s, self.out_log, self.global_log)
149 # save results table
150 results_table = pd.DataFrame(data={'method': ['silhouette'], 'coefficient': [max(silhouette_list)], 'cluster': [best_s]})
151 fu.log('Gathering results\n\nRESULTS TABLE\n\n%s\n' % results_table.to_string(index=False), self.out_log, self.global_log)
152 fu.log('Saving results to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log)
153 results_table.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True, float_format='%.3f')
155 # wcss plot
156 if self.io_dict["out"]["output_plot_path"]:
157 fu.log('Saving methods plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log)
158 plot = plotAgglomerativeTrain(self.max_clusters, silhouette_list, best_s)
159 plot.savefig(self.io_dict["out"]["output_plot_path"], dpi=150)
161 # Copy files to host
162 self.copy_to_host()
164 self.tmp_files.extend([
165 self.stage_io_dict.get("unique_dir")
166 ])
167 self.remove_tmp_files()
169 self.check_arguments(output_files_created=True, raise_exception=False)
171 return 0
174def agglomerative_coefficient(input_dataset_path: str, output_results_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int:
175 """Execute the :class:`AgglomerativeCoefficient <clustering.agglomerative_coefficient.AgglomerativeCoefficient>` class and
176 execute the :meth:`launch() <clustering.agglomerative_coefficient.AgglomerativeCoefficient.launch>` method."""
178 return AgglomerativeCoefficient(input_dataset_path=input_dataset_path,
179 output_results_path=output_results_path,
180 output_plot_path=output_plot_path,
181 properties=properties, **kwargs).launch()
184def main():
185 """Command line execution of this building block. Please check the command line documentation."""
186 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn AgglomerativeCoefficient method. ", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999))
187 parser.add_argument('--config', required=False, help='Configuration file')
189 # Specific args of each building block
190 required_args = parser.add_argument_group('required arguments')
191 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.')
192 required_args.add_argument('--output_results_path', required=True, help='Path to the gap values list. Accepted formats: csv.')
193 parser.add_argument('--output_plot_path', required=False, help='Path to the elbow and gap methods plot. Accepted formats: png.')
195 args = parser.parse_args()
196 args.config = args.config or "{}"
197 properties = settings.ConfReader(config=args.config).get_prop_dic()
199 # Specific call of each building block
200 agglomerative_coefficient(input_dataset_path=args.input_dataset_path,
201 output_results_path=args.output_results_path,
202 output_plot_path=args.output_plot_path,
203 properties=properties)
206if __name__ == '__main__':
207 main()