Coverage for biobb_ml/utils/common.py: 64%

73 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-03 14:57 +0000

1""" Common functions for package biobb_ml.utils """ 

2import csv 

3import re 

4from pathlib import Path, PurePath 

5from biobb_common.tools import file_utils as fu 

6from warnings import simplefilter 

7# ignore all future warnings 

8simplefilter(action='ignore', category=FutureWarning) 

9 

10 

11# CHECK PARAMETERS 

12 

13def check_input_path(path, argument, out_log, classname): 

14 """ Checks input file """ 

15 if not Path(path).exists(): 

16 fu.log(classname + ': Unexisting %s file, exiting' % argument, out_log) 

17 raise SystemExit(classname + ': Unexisting %s file' % argument) 

18 file_extension = PurePath(path).suffix 

19 if not is_valid_file(file_extension[1:], argument): 

20 fu.log(classname + ': Format %s in %s file is not compatible' % (file_extension[1:], argument), out_log) 

21 raise SystemExit(classname + ': Format %s in %s file is not compatible' % (file_extension[1:], argument)) 

22 return path 

23 

24 

25def check_output_path(path, argument, optional, out_log, classname): 

26 """ Checks output file """ 

27 if optional and not path: 

28 return None 

29 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

30 fu.log(classname + ': Unexisting %s folder, exiting' % argument, out_log) 

31 raise SystemExit(classname + ': Unexisting %s folder' % argument) 

32 file_extension = PurePath(path).suffix 

33 if not is_valid_file(file_extension[1:], argument): 

34 fu.log(classname + ': Format %s in %s file is not compatible' % (file_extension[1:], argument), out_log) 

35 raise SystemExit(classname + ': Format %s in %s file is not compatible' % (file_extension[1:], argument)) 

36 return path 

37 

38 

39def is_valid_file(ext, argument): 

40 """ Checks if file format is compatible """ 

41 formats = { 

42 'input_dataset_path': ['csv', 'txt'], 

43 'output_dataset_path': ['csv'], 

44 'output_plot_path': ['png'], 

45 'input_model_path': ['pkl'] 

46 } 

47 return ext in formats[argument] 

48 

49 

50def check_mandatory_property(property, name, out_log, classname): 

51 """ Checks if property is mandatory """ 

52 if not property: 

53 fu.log(classname + ': Unexisting %s property, exiting' % name, out_log) 

54 raise SystemExit(classname + ': Unexisting %s property' % name) 

55 return property 

56 

57 

58def getIndependentVars(independent_vars, data, out_log, classname): 

59 if 'indexes' in independent_vars: 

60 return data.iloc[:, independent_vars['indexes']] 

61 elif 'range' in independent_vars: 

62 ranges_list = [] 

63 for rng in independent_vars['range']: 

64 for x in range(rng[0], (rng[1] + 1)): 

65 ranges_list.append(x) 

66 return data.iloc[:, ranges_list] 

67 elif 'columns' in independent_vars: 

68 return data.loc[:, independent_vars['columns']] 

69 else: 

70 fu.log(classname + ': Incorrect independent_vars format', out_log) 

71 raise SystemExit(classname + ': Incorrect independent_vars format') 

72 

73 

74def getIndependentVarsList(independent_vars): 

75 if 'indexes' in independent_vars: 

76 return ', '.join(str(x) for x in independent_vars['indexes']) 

77 elif 'range' in independent_vars: 

78 return ', '.join([str(y) for r in independent_vars['range'] for y in range(r[0], r[1] + 1)]) 

79 elif 'columns' in independent_vars: 

80 return ', '.join(independent_vars['columns']) 

81 

82 

83def getTargetsList(targets, tool, out_log, classname): 

84 

85 if not targets and tool == 'drop': 

86 fu.log(classname + ': No targets provided, exiting', out_log) 

87 raise SystemExit(classname + ': No targets provided, exiting') 

88 elif not targets and tool != 'drop': 

89 fu.log('No targets provided, all columns will be taken', out_log) 

90 

91 if 'indexes' in targets: 

92 return targets['indexes'] 

93 elif 'range' in targets: 

94 return ([y for r in targets['range'] for y in range(r[0], r[1] + 1)]) 

95 elif 'columns' in targets: 

96 return targets['columns'] 

97 

98 

99def getHeader(file): 

100 with open(file, newline='') as f: 

101 reader = csv.reader(f) 

102 header = next(reader) 

103 

104 if (len(header) == 1): 

105 return list(re.sub('\\s+|;|:|,|\t', ',', header[0]).split(",")) 

106 else: 

107 return header