Coverage for biobb_ml/neural_networks/autoencoder_neural_network.py: 89%

150 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-03 14:57 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the AutoencoderNeuralNetwork class and the command line interface.""" 

4import argparse 

5import h5py 

6import json 

7import numpy as np 

8import pandas as pd 

9from biobb_common.generic.biobb_object import BiobbObject 

10from tensorflow.python.keras.saving import hdf5_format 

11from tensorflow.keras.models import Model 

12from tensorflow.keras.layers import Input, LSTM, Dense, RepeatVector, TimeDistributed 

13from biobb_common.configuration import settings 

14from biobb_common.tools import file_utils as fu 

15from biobb_common.tools.file_utils import launchlogger 

16from biobb_ml.neural_networks.common import check_input_path, check_output_path 

17 

18 

19class AutoencoderNeuralNetwork(BiobbObject): 

20 """ 

21 | biobb_ml AutoencoderNeuralNetwork 

22 | Wrapper of the TensorFlow Keras LSTM method for encoding. 

23 | Fits and tests a given dataset and save the compiled model for an Autoencoder Neural Network. Visit the `LSTM documentation page <https://www.tensorflow.org/api_docs/python/tf/keras/layers/LSTM>`_ in the TensorFlow Keras official website for further information. 

24 

25 Args: 

26 input_decode_path (str): Path to the input decode dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/neural_networks/dataset_autoencoder_decode.csv>`_. Accepted formats: csv (edam:format_3752). 

27 input_predict_path (str) (Optional): Path to the input predict dataset. File type: input. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/data/neural_networks/dataset_autoencoder_predict.csv>`_. Accepted formats: csv (edam:format_3752). 

28 output_model_path (str): Path to the output model file. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/neural_networks/ref_output_model_autoencoder.h5>`_. Accepted formats: h5 (edam:format_3590). 

29 output_test_decode_path (str) (Optional): Path to the test decode table file. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/neural_networks/ref_output_test_decode_autoencoder.csv>`_. Accepted formats: csv (edam:format_3752). 

30 output_test_predict_path (str) (Optional): Path to the test predict table file. File type: output. `Sample file <https://github.com/bioexcel/biobb_ml/raw/master/biobb_ml/test/reference/neural_networks/ref_output_test_predict_autoencoder.csv>`_. Accepted formats: csv (edam:format_3752). 

31 properties (dic - Python dictionary object containing the tool parameters, not input/output files): 

32 * **optimizer** (*string*) - ("Adam") Name of optimizer instance. Values: Adadelta (Adadelta optimization is a stochastic gradient descent method that is based on adaptive learning rate per dimension to address two drawbacks: the continual decay of learning rates throughout training and the need for a manually selected global learning rate), Adagrad (Adagrad is an optimizer with parameter-specific learning rates; which are adapted relative to how frequently a parameter gets updated during training. The more updates a parameter receives; the smaller the updates), Adam (Adam optimization is a stochastic gradient descent method that is based on adaptive estimation of first-order and second-order moments), Adamax (It is a variant of Adam based on the infinity norm. Default parameters follow those provided in the paper. Adamax is sometimes superior to adam; specially in models with embeddings), Ftrl (Optimizer that implements the FTRL algorithm), Nadam (Much like Adam is essentially RMSprop with momentum; Nadam is Adam with Nesterov momentum), RMSprop (Optimizer that implements the RMSprop algorithm), SGD (Gradient descent -with momentum- optimizer). 

33 * **learning_rate** (*float*) - (0.02) [0~100|0.01] Determines the step size at each iteration while moving toward a minimum of a loss function 

34 * **batch_size** (*int*) - (100) [0~1000|1] Number of samples per gradient update. 

35 * **max_epochs** (*int*) - (100) [0~1000|1] Number of epochs to train the model. As the early stopping is enabled, this is a maximum. 

36 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

37 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

38 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

39 

40 Examples: 

41 This is a use example of how to use the building block from Python:: 

42 

43 from biobb_ml.neural_networks.autoencoder_neural_network import autoencoder_neural_network 

44 prop = { 

45 'optimizer': 'Adam', 

46 'learning_rate': 0.01, 

47 'batch_size': 32, 

48 'max_epochs': 300 

49 } 

50 autoencoder_neural_network(input_decode_path='/path/to/myDecodeDataset.csv', 

51 output_model_path='/path/to/newModel.h5', 

52 input_predict_path='/path/to/myPredictDataset.csv', 

53 output_test_decode_path='/path/to/newDecodeDataset.csv', 

54 output_test_predict_path='/path/to/newPredictDataset.csv', 

55 properties=prop) 

56 

57 Info: 

58 * wrapped_software: 

59 * name: TensorFlow Keras LSTM 

60 * version: >2.1.0 

61 * license: MIT 

62 * ontology: 

63 * name: EDAM 

64 * schema: http://edamontology.org/EDAM.owl 

65 

66 """ 

67 

68 def __init__(self, input_decode_path, output_model_path, 

69 input_predict_path=None, output_test_decode_path=None, 

70 output_test_predict_path=None, properties=None, **kwargs) -> None: 

71 properties = properties or {} 

72 

73 # Call parent class constructor 

74 super().__init__(properties) 

75 self.locals_var_dict = locals().copy() 

76 

77 # Input/Output files 

78 self.io_dict = { 

79 "in": {"input_decode_path": input_decode_path, "input_predict_path": input_predict_path}, 

80 "out": {"output_model_path": output_model_path, "output_test_decode_path": output_test_decode_path, "output_test_predict_path": output_test_predict_path} 

81 } 

82 

83 # Properties specific for BB 

84 self.optimizer = properties.get('optimizer', 'Adam') 

85 self.learning_rate = properties.get('learning_rate', 0.02) 

86 self.batch_size = properties.get('batch_size', 100) 

87 self.max_epochs = properties.get('max_epochs', 100) 

88 self.properties = properties 

89 

90 # Check the properties 

91 self.check_properties(properties) 

92 self.check_arguments() 

93 

94 def check_data_params(self, out_log, err_log): 

95 """ Checks all the input/output paths and parameters """ 

96 self.io_dict["in"]["input_decode_path"] = check_input_path(self.io_dict["in"]["input_decode_path"], "input_decode_path", False, out_log, self.__class__.__name__) 

97 if self.io_dict["in"]["input_predict_path"]: 

98 self.io_dict["in"]["input_predict_path"] = check_input_path(self.io_dict["in"]["input_predict_path"], "input_predict_path", True, out_log, self.__class__.__name__) 

99 self.io_dict["out"]["output_model_path"] = check_output_path(self.io_dict["out"]["output_model_path"], "output_model_path", False, out_log, self.__class__.__name__) 

100 if self.io_dict["out"]["output_test_decode_path"]: 

101 self.io_dict["out"]["output_test_decode_path"] = check_output_path(self.io_dict["out"]["output_test_decode_path"], "output_test_decode_path", True, out_log, self.__class__.__name__) 

102 if self.io_dict["out"]["output_test_predict_path"]: 

103 self.io_dict["out"]["output_test_predict_path"] = check_output_path(self.io_dict["out"]["output_test_predict_path"], "output_test_predict_path", True, out_log, self.__class__.__name__) 

104 

105 def build_model(self, n_in, n_out=None): 

106 """ Builds Neural network """ 

107 

108 # outputs list 

109 outputs = [] 

110 

111 # define encoder 

112 visible = Input(shape=(n_in, 1)) 

113 encoder = LSTM(100, activation='relu')(visible) 

114 

115 # define reconstruct decoder 

116 decoder1 = RepeatVector(n_in)(encoder) 

117 decoder1 = LSTM(100, activation='relu', return_sequences=True)(decoder1) 

118 decoder1 = TimeDistributed(Dense(1))(decoder1) 

119 

120 outputs.append(decoder1) 

121 

122 # define predict decoder 

123 if n_out: 

124 decoder2 = RepeatVector(n_out)(encoder) 

125 decoder2 = LSTM(100, activation='relu', return_sequences=True)(decoder2) 

126 decoder2 = TimeDistributed(Dense(1))(decoder2) 

127 outputs.append(decoder2) 

128 

129 # tie it together 

130 model = Model(inputs=visible, outputs=outputs) 

131 

132 return model 

133 

134 @launchlogger 

135 def launch(self) -> int: 

136 """Execute the :class:`AutoencoderNeuralNetwork <neural_networks.autoencoder_neural_network.AutoencoderNeuralNetwork>` neural_networks.autoencoder_neural_network.AutoencoderNeuralNetwork object.""" 

137 

138 # check input/output paths and parameters 

139 self.check_data_params(self.out_log, self.err_log) 

140 

141 # Setup Biobb 

142 if self.check_restart(): 

143 return 0 

144 self.stage_files() 

145 

146 # load decode dataset 

147 fu.log('Getting decode dataset from %s' % self.io_dict["in"]["input_decode_path"], self.out_log, self.global_log) 

148 data_dec = pd.read_csv(self.io_dict["in"]["input_decode_path"]) 

149 seq_in = np.array(data_dec) 

150 

151 # reshape input into [samples, timesteps, features] 

152 n_in = len(seq_in) 

153 seq_in = seq_in.reshape((1, n_in, 1)) 

154 

155 # load predict dataset 

156 n_out = None 

157 if (self.io_dict["in"]["input_predict_path"]): 

158 fu.log('Getting predict dataset from %s' % self.io_dict["in"]["input_predict_path"], self.out_log, self.global_log) 

159 data_pred = pd.read_csv(self.io_dict["in"]["input_predict_path"]) 

160 seq_out = np.array(data_pred) 

161 

162 # reshape output into [samples, timesteps, features] 

163 n_out = len(seq_out) 

164 seq_out = seq_out.reshape((1, n_out, 1)) 

165 

166 # build model 

167 fu.log('Building model', self.out_log, self.global_log) 

168 model = self.build_model(n_in, n_out) 

169 

170 # model summary 

171 stringlist = [] 

172 model.summary(print_fn=lambda x: stringlist.append(x)) 

173 model_summary = "\n".join(stringlist) 

174 fu.log('Model summary:\n\n%s\n' % model_summary, self.out_log, self.global_log) 

175 

176 # get optimizer 

177 mod = __import__('tensorflow.keras.optimizers', fromlist=[self.optimizer]) 

178 opt_class = getattr(mod, self.optimizer) 

179 opt = opt_class(lr=self.learning_rate) 

180 # compile model 

181 model.compile(optimizer=opt, loss='mse', metrics=['mse', 'mae']) 

182 

183 # fitting 

184 fu.log('Training model', self.out_log, self.global_log) 

185 y_list = [seq_in] 

186 if n_out: 

187 y_list.append(seq_out) 

188 # fit the model 

189 mf = model.fit(seq_in, 

190 y_list, 

191 batch_size=self.batch_size, 

192 epochs=self.max_epochs, 

193 verbose=1) 

194 

195 train_metrics = pd.DataFrame() 

196 metric = [] 

197 coefficient = [] 

198 for key, lst in mf.history.items(): 

199 metric.append(' '.join(x.capitalize() or '_' for x in key.split('_'))) 

200 coefficient.append(lst[-1]) 

201 

202 train_metrics['metric'] = metric 

203 train_metrics['coefficient'] = coefficient 

204 

205 fu.log('Calculating metrics\n\nMETRICS TABLE\n\n%s\n' % train_metrics, self.out_log, self.global_log) 

206 

207 # predicting 

208 fu.log('Predicting model', self.out_log, self.global_log) 

209 yhat = model.predict(seq_in, verbose=1) 

210 

211 decoding_table = pd.DataFrame() 

212 if (self.io_dict["in"]["input_predict_path"]): 

213 decoding_table['reconstructed'] = np.squeeze(np.asarray(yhat[0][0])) 

214 decoding_table['original'] = data_dec 

215 else: 

216 decoding_table['reconstructed'] = np.squeeze(np.asarray(yhat[0])) 

217 decoding_table['original'] = np.squeeze(np.asarray(data_dec)) 

218 decoding_table['residual'] = decoding_table['original'] - decoding_table['reconstructed'] 

219 decoding_table['difference %'] = np.absolute(decoding_table['residual']/decoding_table['original']*100) 

220 pd.set_option('display.float_format', lambda x: '%.5f' % x) 

221 # sort by difference in % 

222 decoding_table = decoding_table.sort_values(by=['difference %']) 

223 decoding_table = decoding_table.reset_index(drop=True) 

224 fu.log('RECONSTRUCTION TABLE\n\n%s\n' % decoding_table, self.out_log, self.global_log) 

225 

226 # save reconstruction data 

227 if (self.io_dict["out"]["output_test_decode_path"]): 

228 fu.log('Saving reconstruction data to %s' % self.io_dict["out"]["output_test_decode_path"], self.out_log, self.global_log) 

229 decoding_table.to_csv(self.io_dict["out"]["output_test_decode_path"], index=False, header=True) 

230 

231 if (self.io_dict["in"]["input_predict_path"]): 

232 prediction_table = pd.DataFrame() 

233 prediction_table['predicted'] = np.squeeze(np.asarray(yhat[1][0])) 

234 prediction_table['original'] = data_pred 

235 prediction_table['residual'] = prediction_table['original'] - prediction_table['predicted'] 

236 prediction_table['difference %'] = np.absolute(prediction_table['residual']/prediction_table['original']*100) 

237 pd.set_option('display.float_format', lambda x: '%.5f' % x) 

238 # sort by difference in % 

239 prediction_table = prediction_table.sort_values(by=['difference %']) 

240 prediction_table = prediction_table.reset_index(drop=True) 

241 fu.log('PREDICTION TABLE\n\n%s\n' % prediction_table, self.out_log, self.global_log) 

242 

243 # save decoding data 

244 if (self.io_dict["out"]["output_test_predict_path"]): 

245 fu.log('Saving prediction data to %s' % self.io_dict["out"]["output_test_predict_path"], self.out_log, self.global_log) 

246 prediction_table.to_csv(self.io_dict["out"]["output_test_predict_path"], index=False, header=True) 

247 

248 # save model and parameters 

249 vars_obj = { 

250 'type': 'autoencoder' 

251 } 

252 variables = json.dumps(vars_obj) 

253 fu.log('Saving model to %s' % self.io_dict["out"]["output_model_path"], self.out_log, self.global_log) 

254 with h5py.File(self.io_dict["out"]["output_model_path"], mode='w') as f: 

255 hdf5_format.save_model_to_hdf5(model, f) 

256 f.attrs['variables'] = variables 

257 

258 # Copy files to host 

259 self.copy_to_host() 

260 

261 self.tmp_files.extend([ 

262 self.stage_io_dict.get("unique_dir") 

263 ]) 

264 self.remove_tmp_files() 

265 

266 self.check_arguments(output_files_created=True, raise_exception=False) 

267 

268 return 0 

269 

270 

271def autoencoder_neural_network(input_decode_path: str, output_model_path: str, input_predict_path: str = None, output_test_decode_path: str = None, output_test_predict_path: str = None, properties: dict = None, **kwargs) -> int: 

272 """Execute the :class:`AutoencoderNeuralNetwork <neural_networks.autoencoder_neural_network.AutoencoderNeuralNetwork>` class and 

273 execute the :meth:`launch() <neural_networks.autoencoder_neural_network.AutoencoderNeuralNetwork.launch>` method.""" 

274 

275 return AutoencoderNeuralNetwork(input_decode_path=input_decode_path, 

276 output_model_path=output_model_path, 

277 input_predict_path=input_predict_path, 

278 output_test_decode_path=output_test_decode_path, 

279 output_test_predict_path=output_test_predict_path, 

280 properties=properties, **kwargs).launch() 

281 

282 

283def main(): 

284 """Command line execution of this building block. Please check the command line documentation.""" 

285 parser = argparse.ArgumentParser(description="Wrapper of the TensorFlow Keras LSTM method for encoding.", formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) 

286 parser.add_argument('--config', required=False, help='Configuration file') 

287 

288 # Specific args of each building block 

289 required_args = parser.add_argument_group('required arguments') 

290 required_args.add_argument('--input_decode_path', required=True, help='Path to the input decode dataset. Accepted formats: csv.') 

291 parser.add_argument('--input_predict_path', required=False, help='Path to the input predict dataset. Accepted formats: csv.') 

292 required_args.add_argument('--output_model_path', required=True, help='Path to the output model file. Accepted formats: h5.') 

293 parser.add_argument('--output_test_decode_path', required=False, help='Path to the test decode table file. Accepted formats: csv.') 

294 parser.add_argument('--output_test_predict_path', required=False, help='Path to the test predict table file. Accepted formats: csv.') 

295 

296 args = parser.parse_args() 

297 args.config = args.config or "{}" 

298 properties = settings.ConfReader(config=args.config).get_prop_dic() 

299 

300 # Specific call of each building block 

301 autoencoder_neural_network(input_decode_path=args.input_decode_path, 

302 output_model_path=args.output_model_path, 

303 input_predict_path=args.input_predict_path, 

304 output_test_decode_path=args.output_test_decode_path, 

305 output_test_predict_path=args.output_test_predict_path, 

306 properties=properties) 

307 

308 

309if __name__ == '__main__': 

310 main()