Coverage for biobb_ml/clustering/ 83%
83 statements
« prev ^ index » next v7.6.1, created at 2024-10-03 14:57 +0000
« prev ^ index » next v7.6.1, created at 2024-10-03 14:57 +0000
1#!/usr/bin/env python3
3"""Module containing the AgglomerativeCoefficient class and the command line interface."""
4import argparse
5import pandas as pd
6import numpy as np
7from biobb_common.generic.biobb_object import BiobbObject
8from sklearn.preprocessing import StandardScaler
9from biobb_common.configuration import settings
10from import file_utils as fu
11from import launchlogger
12from biobb_ml.clustering.common import check_input_path, check_output_path, getHeader, getIndependentVars, getIndependentVarsList, hopkins, getSilhouetthe, plotAgglomerativeTrain
15class AgglomerativeCoefficient(BiobbObject):
16 """
17 | biobb_ml AgglomerativeCoefficient
18 | Wrapper of the scikit-learn AgglomerativeClustering method.
19 | Clusters a given dataset and calculates best K coefficient. Visit the `AgglomerativeClustering documentation page <>`_ in the sklearn official website for further information.
21 Args:
22 input_dataset_path (str): Path to the input dataset. File type: input. `Sample file <>`_. Accepted formats: csv (edam:format_3752).
23 output_results_path (str): Path to the gap values list. File type: output. `Sample file <>`_. Accepted formats: csv (edam:format_3752).
24 output_plot_path (str) (Optional): Path to the elbow method and gap statistics plot. File type: output. `Sample file <>`_. Accepted formats: png (edam:format_3603).
25 properties (dic - Python dictionary object containing the tool parameters, not input/output files):
26 * **predictors** (*dict*) - ({}) Features or columns from your dataset you want to use for fitting. You can specify either a list of columns names from your input dataset, a list of columns indexes or a range of columns indexes. Formats: { "columns": ["column1", "column2"] } or { "indexes": [0, 2, 3, 10, 11, 17] } or { "range": [[0, 20], [50, 102]] }. In case of mulitple formats, the first one will be picked.
27 * **max_clusters** (*int*) - (6) [1~100|1] Maximum number of clusters to use by default for kmeans queries.
28 * **affinity** (*str*) - ("euclidean") Metric used to compute the linkage. If linkage is "ward", only "euclidean" is accepted. Values: euclidean (Computes the Euclidean distance between two 1-D arrays), l1, l2, manhattan (Compute the Manhattan distance), cosine (Compute the Cosine distance between 1-D arrays), precomputed (means that the flatten array containing the upper triangular of the distance matrix of the original data is used).
29 * **linkage** (*str*) - ("ward") The linkage criterion determines which distance to use between sets of observation. The algorithm will merge the pairs of cluster that minimize this criterion. Values: ward (minimizes the variance of the clusters being merged), complete (uses the maximum distances between all observations of the two sets), average (uses the average of the distances of each observation of the two sets), single (uses the minimum of the distances between all observations of the two sets).
30 * **scale** (*bool*) - (False) Whether or not to scale the input dataset.
31 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
32 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
33 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
35 Examples:
36 This is a use example of how to use the building block from Python::
38 from biobb_ml.clustering.agglomerative_coefficient import agglomerative_coefficient
39 prop = {
40 'predictors': {
41 'columns': [ 'column1', 'column2', 'column3' ]
42 },
43 'clusters': 3,
44 'affinity': 'euclidean',
45 'linkage': 'ward',
46 'plots': [
47 {
48 'title': 'Plot 1',
49 'features': ['feat1', 'feat2']
50 }
51 ]
52 }
53 agglomerative_coefficient(input_dataset_path='/path/to/myDataset.csv',
54 output_results_path='/path/to/newTable.csv',
55 output_plot_path='/path/to/newPlot.png',
56 properties=prop)
58 Info:
59 * wrapped_software:
60 * name: scikit-learn AgglomerativeClustering
61 * version: >=0.24.2
62 * license: BSD 3-Clause
63 * ontology:
64 * name: EDAM
65 * schema:
67 """
69 def __init__(self, input_dataset_path, output_results_path,
70 output_plot_path=None, properties=None, **kwargs) -> None:
71 properties = properties or {}
73 # Call parent class constructor
74 super().__init__(properties)
75 self.locals_var_dict = locals().copy()
77 # Input/Output files
78 self.io_dict = {
79 "in": {"input_dataset_path": input_dataset_path},
80 "out": {"output_results_path": output_results_path, "output_plot_path": output_plot_path}
81 }
83 # Properties specific for BB
84 self.predictors = properties.get('predictors', {})
85 self.max_clusters = properties.get('max_clusters', 6)
86 self.affinity = properties.get('affinity', 'euclidean')
87 self.linkage = properties.get('linkage', 'ward')
88 self.scale = properties.get('scale', False)
89 = properties
91 # Check the properties
92 self.check_properties(properties)
93 self.check_arguments()
95 def check_data_params(self, out_log, err_log):
96 """ Checks all the input/output paths and parameters """
97 self.io_dict["in"]["input_dataset_path"] = check_input_path(self.io_dict["in"]["input_dataset_path"], "input_dataset_path", out_log, self.__class__.__name__)
98 self.io_dict["out"]["output_results_path"] = check_output_path(self.io_dict["out"]["output_results_path"], "output_results_path", False, out_log, self.__class__.__name__)
99 if self.io_dict["out"]["output_plot_path"]:
100 self.io_dict["out"]["output_plot_path"] = check_output_path(self.io_dict["out"]["output_plot_path"], "output_plot_path", True, out_log, self.__class__.__name__)
102 @launchlogger
103 def launch(self) -> int:
104 """Execute the :class:`AgglomerativeCoefficient <clustering.agglomerative_coefficient.AgglomerativeCoefficient>` clustering.agglomerative_coefficient.AgglomerativeCoefficient object."""
106 # check input/output paths and parameters
107 self.check_data_params(self.out_log, self.err_log)
109 # Setup Biobb
110 if self.check_restart():
111 return 0
112 self.stage_files()
114 # load dataset
115 fu.log('Getting dataset from %s' % self.io_dict["in"]["input_dataset_path"], self.out_log, self.global_log)
116 if 'columns' in self.predictors:
117 labels = getHeader(self.io_dict["in"]["input_dataset_path"])
118 skiprows = 1
119 else:
120 labels = None
121 skiprows = None
122 data = pd.read_csv(self.io_dict["in"]["input_dataset_path"], header=None, sep="\\s+|;|:|,|\t", engine="python", skiprows=skiprows, names=labels)
124 # the features are the predictors
125 predictors = getIndependentVars(self.predictors, data, self.out_log, self.__class__.__name__)
126 fu.log('Predictors: [%s]' % (getIndependentVarsList(self.predictors)), self.out_log, self.global_log)
128 # Hopkins test
129 H = hopkins(predictors)
130 fu.log('Performing Hopkins test over dataset. H = %f' % H, self.out_log, self.global_log)
132 # scale dataset
133 if self.scale:
134 fu.log('Scaling dataset', self.out_log, self.global_log)
135 scaler = StandardScaler()
136 predictors = scaler.fit_transform(predictors)
138 # calculate silhouette
139 silhouette_list, s_list = getSilhouetthe(method='agglomerative', X=predictors, max_clusters=self.max_clusters, affinity=self.affinity, linkage=self.linkage)
141 # silhouette table
142 silhouette_table = pd.DataFrame(data={'cluster': np.arange(1, self.max_clusters + 1), 'SILHOUETTE': silhouette_list})
143 fu.log('Calculating Silhouette for each cluster\n\nSILHOUETTE TABLE\n\n%s\n' % silhouette_table.to_string(index=False), self.out_log, self.global_log)
145 # get best cluster silhouette method
146 key = silhouette_list.index(max(silhouette_list))
147 best_s = s_list.__getitem__(key)
148 fu.log('Optimal number of clusters according to the Silhouette Method is %d' % best_s, self.out_log, self.global_log)
150 # save results table
151 results_table = pd.DataFrame(data={'method': ['silhouette'], 'coefficient': [max(silhouette_list)], 'cluster': [best_s]})
152 fu.log('Gathering results\n\nRESULTS TABLE\n\n%s\n' % results_table.to_string(index=False), self.out_log, self.global_log)
153 fu.log('Saving results to %s' % self.io_dict["out"]["output_results_path"], self.out_log, self.global_log)
154 results_table.to_csv(self.io_dict["out"]["output_results_path"], index=False, header=True, float_format='%.3f')
156 # wcss plot
157 if self.io_dict["out"]["output_plot_path"]:
158 fu.log('Saving methods plot to %s' % self.io_dict["out"]["output_plot_path"], self.out_log, self.global_log)
159 plot = plotAgglomerativeTrain(self.max_clusters, silhouette_list, best_s)
160 plot.savefig(self.io_dict["out"]["output_plot_path"], dpi=150)
162 # Copy files to host
163 self.copy_to_host()
165 self.tmp_files.extend([
166 self.stage_io_dict.get("unique_dir")
167 ])
168 self.remove_tmp_files()
170 self.check_arguments(output_files_created=True, raise_exception=False)
172 return 0
175def agglomerative_coefficient(input_dataset_path: str, output_results_path: str, output_plot_path: str = None, properties: dict = None, **kwargs) -> int:
176 """Execute the :class:`AgglomerativeCoefficient <clustering.agglomerative_coefficient.AgglomerativeCoefficient>` class and
177 execute the :meth:`launch() <clustering.agglomerative_coefficient.AgglomerativeCoefficient.launch>` method."""
179 return AgglomerativeCoefficient(input_dataset_path=input_dataset_path,
180 output_results_path=output_results_path,
181 output_plot_path=output_plot_path,
182 properties=properties, **kwargs).launch()
185def main():
186 """Command line execution of this building block. Please check the command line documentation."""
187 parser = argparse.ArgumentParser(description="Wrapper of the scikit-learn AgglomerativeCoefficient method. ", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999))
188 parser.add_argument('--config', required=False, help='Configuration file')
190 # Specific args of each building block
191 required_args = parser.add_argument_group('required arguments')
192 required_args.add_argument('--input_dataset_path', required=True, help='Path to the input dataset. Accepted formats: csv.')
193 required_args.add_argument('--output_results_path', required=True, help='Path to the gap values list. Accepted formats: csv.')
194 parser.add_argument('--output_plot_path', required=False, help='Path to the elbow and gap methods plot. Accepted formats: png.')
196 args = parser.parse_args()
197 args.config = args.config or "{}"
198 properties = settings.ConfReader(config=args.config).get_prop_dic()
200 # Specific call of each building block
201 agglomerative_coefficient(input_dataset_path=args.input_dataset_path,
202 output_results_path=args.output_results_path,
203 output_plot_path=args.output_plot_path,
204 properties=properties)
207if __name__ == '__main__':
208 main()