Project

General

Profile

Bug #4569 » 0001-DRAFT-v3-Fix-eternal-500-in-Hawat-after-DB-restart.patch

Radko Krkoš, 02/22/2019 07:53 PM

View differences:

lib/mentat/services/eventstorage.py
106 106
    """
107 107

  
108 108

  
109
class StorageConnectionException(EventStorageException):
110
    """
111
    Class for custom event storage exceptions related to database connection errors.
112
    """
113

  
114

  
109 115
class DataError(EventStorageException):
110 116
    """
111 117
    Class for custom event storage exceptions related to data errors.
......
682 688
        :param conncfg: Connection arguments.
683 689
        """
684 690
        conncfg['cursor_factory'] = psycopg2.extras.NamedTupleCursor
685
        self.connection = psycopg2.connect(**conncfg)
691
        if not hasattr(self, "dsn"):
692
            self.dsn = conncfg
693
        self.connection = psycopg2.connect(**self.dsn)
686 694
        self.cursor     = None
687 695
        self.cursor_new()
688 696

  
......
729 737
        self.cursor = EventStorageCursor(self.connection.cursor())
730 738
        return self.cursor
731 739

  
740
    def handle_db_exceptions(func):
741
        """
742
        Handle exceptions raised during database interfacing operations.
743
        """
744
        def exc_handle_wrapper(self, *args, **kwargs):
745
            exc_store = None
746
            for _ in range(2):
747
                try:
748
                    return func(self, *args, **kwargs)
749

  
750
                except psycopg2.DataError as err:
751
                    self.rollback()
752
                    raise DataError(str(err)) from err
753

  
754
                except psycopg2.OperationalError as err:
755
                    self.__init__()
756
                    exc_store = err
757
                    continue
758

  
759
                except psycopg2.IntegrityError as err:
760
                    self.rollback()
761
                    raise StorageIntegrityError(str(err)) from err
762

  
763
                except psycopg2.DatabaseError as err:
764
                    self.rollback()
765
                    raise EventStorageException(str(err)) from err
766

  
767
            raise EventStorageException("DB connection error during data access") from exc_store
768

  
769
        return exc_handle_wrapper
770

  
771
    @handle_db_exceptions
732 772
    def database_create(self):
733 773
        """
734 774
        Create database SQL schema.
......
752 792
                )
753 793
            )
754 794

  
755
        try:
756
            for query in create_table_sqls:
757
                self.cursor.execute(query)
758
                self.commit()
759

  
760
        except psycopg2.DatabaseError as err:
761
            self.rollback()
762
            raise EventStorageException(str(err)) from err
795
        for query in create_table_sqls:
796
            self.cursor.execute(query)
797
            self.commit()
763 798

  
799
    @handle_db_exceptions
764 800
    def index_create(self):
765 801
        """
766 802
        Create default set of table indices.
......
793 829
                )
794 830
            )
795 831

  
796
        try:
797
            for query in create_index_sqls:
798
                self.cursor.execute(query)
799
                self.commit()
800

  
801
        except psycopg2.DatabaseError as err:
802
            self.rollback()
803
            raise EventStorageException(str(err)) from err
832
        for query in create_index_sqls:
833
            self.cursor.execute(query)
834
            self.commit()
804 835

  
836
    @handle_db_exceptions
805 837
    def database_drop(self):
806 838
        """
807 839
        Drop database SQL schema.
......
825 857
                )
826 858
            )
827 859

  
828
        try:
829
            for query in drop_table_sqls:
830
                self.cursor.execute(query)
831
                self.commit()
832

  
833
        except psycopg2.DatabaseError as err:
834
            self.rollback()
835
            raise EventStorageException(str(err)) from err
860
        for query in drop_table_sqls:
861
            self.cursor.execute(query)
862
            self.commit()
836 863

  
864
    @handle_db_exceptions
837 865
    def index_drop(self):
838 866
        """
839 867
        Drop default set of table indices.
......
863 891
                    )
864 892
                )
865 893
            )
866
        try:
867
            for query in drop_index_sqls:
868
                self.cursor.execute(query)
869
                self.commit()
870

  
871
        except psycopg2.DatabaseError as err:
872
            self.rollback()
873
            raise EventStorageException(str(err)) from err
894
        for query in drop_index_sqls:
895
            self.cursor.execute(query)
896
            self.commit()
874 897

  
875 898
    #---------------------------------------------------------------------------
876 899

  
900
    @handle_db_exceptions
877 901
    def insert_event(self, idea_event):
878 902
        """
879 903
        This method is a convenience wrapper for underlying
......
883 907
        It will automatically commit transaction for successfull database operation
884 908
        and rollback the invalid one.
885 909
        """
886
        try:
887
            self.cursor.insert_event(idea_event)
888
            self.commit()
889

  
890
        except psycopg2.IntegrityError as err:
891
            self.rollback()
892
            raise StorageIntegrityError(str(err)) from err
893

  
894
        except psycopg2.DatabaseError as err:
895
            self.rollback()
896
            raise EventStorageException(str(err)) from err
910
        self.cursor.insert_event(idea_event)
911
        self.commit()
897 912

  
913
    @handle_db_exceptions
898 914
    def fetch_event(self, eventid):
899 915
        """
900 916
        This method is a convenience wrapper for underlying
......
904 920
        It will automatically commit transaction for successfull database operation
905 921
        and rollback the invalid one.
906 922
        """
907
        try:
908
            result = self.cursor.fetch_event(eventid)
909
            self.commit()
910
            return result
911

  
912
        except psycopg2.DatabaseError as err:
913
            self.rollback()
914
            raise EventStorageException(str(err)) from err
923
        result = self.cursor.fetch_event(eventid)
924
        self.commit()
925
        return result
915 926

  
927
    @handle_db_exceptions
916 928
    def delete_event(self, eventid):
917 929
        """
918 930
        This method is a convenience wrapper for underlying
......
922 934
        It will automatically commit transaction for successfull database operation
923 935
        and rollback the invalid one.
924 936
        """
925
        try:
926
            self.cursor.delete_event(eventid)
927
            self.commit()
928

  
929
        except psycopg2.DatabaseError as err:
930
            self.rollback()
931
            raise EventStorageException(str(err)) from err
937
        self.cursor.delete_event(eventid)
938
        self.commit()
932 939

  
940
    @handle_db_exceptions
933 941
    def count_events(self, parameters = None):
934 942
        """
935 943
        This method is a convenience wrapper for underlying
......
939 947
        It will automatically commit transaction for successfull database operation
940 948
        and rollback the invalid one.
941 949
        """
942
        try:
943
            result = self.cursor.count_events(parameters)
944
            self.commit()
945
            return result
946

  
947
        except psycopg2.DataError as err:
948
            self.rollback()
949
            raise DataError(str(err)) from err
950

  
951
        except psycopg2.DatabaseError as err:
952
            self.rollback()
953
            raise EventStorageException(str(err)) from err
950
        result = self.cursor.count_events(parameters)
951
        self.commit()
952
        return result
954 953

  
954
    @handle_db_exceptions
955 955
    def search_events(self, parameters = None, event_factory = record_to_idea, columns = EVENT_COLUMNS):
956 956
        """
957 957
        This method is a convenience wrapper for underlying
......
961 961
        It will automatically commit transaction for successfull database operation
962 962
        and rollback the invalid one.
963 963
        """
964
        try:
965
            count, result = self.cursor.search_events(parameters, event_factory, columns)
966
            self.commit()
967
            return count, result
968

  
969
        except psycopg2.DataError as err:
970
            self.rollback()
971
            raise DataError(str(err)) from err
972

  
973
        except psycopg2.DatabaseError as err:
974
            self.rollback()
975
            raise EventStorageException(str(err)) from err
964
        count, result = self.cursor.search_events(parameters, event_factory, columns)
965
        self.commit()
966
        return count, result
976 967

  
968
    @handle_db_exceptions
977 969
    def search_column_with(self, column, function = 'min'):
978 970
        """
979 971
        This method is a convenience wrapper for underlying
......
983 975
        It will automatically commit transaction for successfull database operation
984 976
        and rollback the invalid one.
985 977
        """
986
        try:
987
            result = self.cursor.search_column_with(column, function)
988
            self.commit()
989
            return result
990

  
991
        except psycopg2.DataError as err:
992
            self.rollback()
993
            raise DataError(str(err)) from err
994

  
995
        except psycopg2.DatabaseError as err:
996
            self.rollback()
997
            raise EventStorageException(str(err)) from err
978
        result = self.cursor.search_column_with(column, function)
979
        self.commit()
980
        return result
998 981

  
982
    @handle_db_exceptions
999 983
    def watchdog_events(self, interval):
1000 984
        """
1001 985
        Perform watchdog operation on event database: Check if any new events were
......
1005 989
        :return: ``True`` in case any events were stored within given interval, ``False`` otherwise.
1006 990
        :rtype: bool
1007 991
        """
1008
        try:
1009
            result = self.cursor.watchdog_events(interval)
1010
            self.commit()
1011
            return result
1012

  
1013
        except psycopg2.DataError as err:
1014
            self.rollback()
1015
            raise DataError(str(err)) from err
1016

  
1017
        except psycopg2.DatabaseError as err:
1018
            self.rollback()
1019
            raise EventStorageException(str(err)) from err
1020

  
992
        result = self.cursor.watchdog_events(interval)
993
        self.commit()
994
        return result
1021 995

  
996
    @handle_db_exceptions
1022 997
    def delete_events(self, parameters = None):
1023 998
        """
1024 999
        Delete IDEA messages in database according to given optional parameters.
......
1028 1003
        :return: Number of deleted events.
1029 1004
        :rtype: int
1030 1005
        """
1031
        try:
1032
            count = self.cursor.delete_events(parameters)
1033
            self.commit()
1034
            return count
1035

  
1036
        except psycopg2.DatabaseError as err:
1037
            self.rollback()
1038
            raise EventStorageException(str(err)) from err
1006
        count = self.cursor.delete_events(parameters)
1007
        self.commit()
1008
        return count
1039 1009

  
1010
    @handle_db_exceptions
1040 1011
    def distinct_values(self, column):
1041 1012
        """
1042 1013
        Return distinct values of given table column.
......
1049 1020
        :rtype: list
1050 1021
        """
1051 1022
        enum_table = "enum_{}".format(column)
1052
        try:
1053
            # Build and execute query for updating enumeration table.
1054
            enum_query = psycopg2.sql.SQL("INSERT INTO {} (SELECT * FROM (").format(psycopg2.sql.Identifier(enum_table))
1055
            if column not in ('cesnet_eventclass', 'cesnet_eventseverity'):
1056
                enum_query += psycopg2.sql.SQL("SELECT unnest({})").format(psycopg2.sql.Identifier(column))
1057
            else:
1058
                enum_query += psycopg2.sql.SQL("SELECT {}").format(psycopg2.sql.Identifier(column))
1059
            enum_query += psycopg2.sql.SQL(' AS data, max(cesnet_storagetime) AS last_seen FROM events WHERE cesnet_storagetime >= COALESCE((SELECT max(last_seen) FROM {}), (SELECT min(cesnet_storagetime) FROM events)) GROUP BY data) AS enum WHERE data IS NOT NULL) ON CONFLICT (data) DO UPDATE SET last_seen = excluded.last_seen').format(psycopg2.sql.Identifier(enum_table))
1060
            self.cursor.execute(enum_query)
1061
            self.commit()
1062

  
1063
            # Return all entries from recetly updated enumeration table.
1064
            self.cursor.execute(
1065
                psycopg2.sql.SQL("SELECT data FROM {} ORDER BY data").format(psycopg2.sql.Identifier(enum_table))
1066
            )
1067
            result_raw = self.cursor.fetchall()
1068
            self.commit()
1069
            return [item[0] for item in result_raw if item[0] is not None]
1023
        # Build and execute query for updating enumeration table.
1024
        enum_query = psycopg2.sql.SQL("INSERT INTO {} (SELECT * FROM (").format(psycopg2.sql.Identifier(enum_table))
1025
        if column not in ('cesnet_eventclass', 'cesnet_eventseverity'):
1026
            enum_query += psycopg2.sql.SQL("SELECT unnest({})").format(psycopg2.sql.Identifier(column))
1027
        else:
1028
            enum_query += psycopg2.sql.SQL("SELECT {}").format(psycopg2.sql.Identifier(column))
1029
        enum_query += psycopg2.sql.SQL(' AS data, max(cesnet_storagetime) AS last_seen FROM events WHERE cesnet_storagetime >= COALESCE((SELECT max(last_seen) FROM {}), (SELECT min(cesnet_storagetime) FROM events)) GROUP BY data) AS enum WHERE data IS NOT NULL) ON CONFLICT (data) DO UPDATE SET last_seen = excluded.last_seen').format(psycopg2.sql.Identifier(enum_table))
1030
        self.cursor.execute(enum_query)
1031
        self.commit()
1070 1032

  
1071
        except psycopg2.DatabaseError as err:
1072
            self.rollback()
1073
            raise EventStorageException(str(err)) from err
1033
        # Return all entries from recetly updated enumeration table.
1034
        self.cursor.execute(
1035
            psycopg2.sql.SQL("SELECT data FROM {} ORDER BY data").format(psycopg2.sql.Identifier(enum_table))
1036
        )
1037
        result_raw = self.cursor.fetchall()
1038
        self.commit()
1039
        return [item[0] for item in result_raw if item[0] is not None]
1074 1040

  
1041
    @handle_db_exceptions
1075 1042
    def table_cleanup(self, table, column, ttl):
1076 1043
        """
1077 1044
        This method is a convenience wrapper for underlying
......
1081 1048
        It will automatically commit transaction for successfull database operation
1082 1049
        and rollback the invalid one.
1083 1050
        """
1084
        try:
1085
            count = self.cursor.table_cleanup(table, column, ttl)
1086
            self.commit()
1087
            return count
1088

  
1089
        except psycopg2.DatabaseError as err:
1090
            self.rollback()
1091
            raise EventStorageException(str(err)) from err
1051
        count = self.cursor.table_cleanup(table, column, ttl)
1052
        self.commit()
1053
        return count
1092 1054

  
1055
    @handle_db_exceptions
1093 1056
    def threshold_set(self, key, thresholdtime, relapsetime, ttl):
1094 1057
        """
1095 1058
        This method is a convenience wrapper for underlying
......
1099 1062
        It will automatically commit transaction for successfull database operation
1100 1063
        and rollback the invalid one.
1101 1064
        """
1102
        try:
1103
            self.cursor.threshold_set(key, thresholdtime, relapsetime, ttl)
1104
            self.commit()
1105

  
1106
        except psycopg2.IntegrityError as err:
1107
            self.rollback()
1108
            raise StorageIntegrityError(str(err)) from err
1109

  
1110
        except psycopg2.DatabaseError as err:
1111
            self.rollback()
1112
            raise EventStorageException(str(err)) from err
1065
        self.cursor.threshold_set(key, thresholdtime, relapsetime, ttl)
1066
        self.commit()
1113 1067

  
1068
    @handle_db_exceptions
1114 1069
    def threshold_check(self, key, threshold):
1115 1070
        """
1116 1071
        This method is a convenience wrapper for underlying
......
1120 1075
        It will automatically commit transaction for successfull database operation
1121 1076
        and rollback the invalid one.
1122 1077
        """
1123
        try:
1124
            result = self.cursor.threshold_check(key, threshold)
1125
            self.commit()
1126
            return result
1127

  
1128
        except psycopg2.DatabaseError as err:
1129
            self.rollback()
1130
            raise EventStorageException(str(err)) from err
1078
        result = self.cursor.threshold_check(key, threshold)
1079
        self.commit()
1080
        return result
1131 1081

  
1082
    @handle_db_exceptions
1132 1083
    def threshold_save(self, eventid, keyid, group_name, severity, createtime):
1133 1084
        """
1134 1085
        This method is a convenience wrapper for underlying
......
1138 1089
        It will automatically commit transaction for successfull database operation
1139 1090
        and rollback the invalid one.
1140 1091
        """
1141
        try:
1142
            self.cursor.threshold_save(eventid, keyid, group_name, severity, createtime)
1143
            self.commit()
1144

  
1145
        except psycopg2.IntegrityError as err:
1146
            self.rollback()
1147
            raise StorageIntegrityError(str(err)) from err
1148

  
1149
        except psycopg2.DatabaseError as err:
1150
            self.rollback()
1151
            raise EventStorageException(str(err)) from err
1092
        self.cursor.threshold_save(eventid, keyid, group_name, severity, createtime)
1093
        self.commit()
1152 1094

  
1095
    @handle_db_exceptions
1153 1096
    def thresholds_count(self):
1154 1097
        """
1155 1098
        This method is a convenience wrapper for underlying
......
1159 1102
        It will automatically commit transaction for successfull database operation
1160 1103
        and rollback the invalid one.
1161 1104
        """
1162
        try:
1163
            result = self.cursor.thresholds_count()
1164
            self.commit()
1165
            return result
1166

  
1167
        except psycopg2.DataError as err:
1168
            self.rollback()
1169
            raise DataError(str(err)) from err
1170

  
1171
        except psycopg2.DatabaseError as err:
1172
            self.rollback()
1173
            raise EventStorageException(str(err)) from err
1105
        result = self.cursor.thresholds_count()
1106
        self.commit()
1107
        return result
1174 1108

  
1109
    @handle_db_exceptions
1175 1110
    def thresholds_clean(self, threshold):
1176 1111
        """
1177 1112
        This method is a convenience wrapper for underlying
......
1181 1116
        It will automatically commit transaction for successfull database operation
1182 1117
        and rollback the invalid one.
1183 1118
        """
1184
        try:
1185
            count = self.cursor.thresholds_clean(threshold)
1186
            self.commit()
1187
            return count
1188

  
1189
        except psycopg2.DatabaseError as err:
1190
            self.rollback()
1191
            raise EventStorageException(str(err)) from err
1119
        count = self.cursor.thresholds_clean(threshold)
1120
        self.commit()
1121
        return count
1192 1122

  
1123
    @handle_db_exceptions
1193 1124
    def search_relapsed_events(self, group_name, severity, ttl):
1194 1125
        """
1195 1126
        This method is a convenience wrapper for underlying
......
1199 1130
        It will automatically commit transaction for successfull database operation
1200 1131
        and rollback the invalid one.
1201 1132
        """
1202
        try:
1203
            events = self.cursor.search_relapsed_events(group_name, severity, ttl)
1204
            self.commit()
1205
            return events
1206

  
1207
        except psycopg2.DatabaseError as err:
1208
            self.rollback()
1209
            raise EventStorageException(str(err)) from err
1133
        events = self.cursor.search_relapsed_events(group_name, severity, ttl)
1134
        self.commit()
1135
        return events
1210 1136

  
1137
    @handle_db_exceptions
1211 1138
    def thresholded_events_count(self):
1212 1139
        """
1213 1140
        This method is a convenience wrapper for underlying
......
1217 1144
        It will automatically commit transaction for successfull database operation
1218 1145
        and rollback the invalid one.
1219 1146
        """
1220
        try:
1221
            result = self.cursor.thresholded_events_count()
1222
            self.commit()
1223
            return result
1224

  
1225
        except psycopg2.DataError as err:
1226
            self.rollback()
1227
            raise DataError(str(err)) from err
1228

  
1229
        except psycopg2.DatabaseError as err:
1230
            self.rollback()
1231
            raise EventStorageException(str(err)) from err
1147
        result = self.cursor.thresholded_events_count()
1148
        self.commit()
1149
        return result
1232 1150

  
1151
    @handle_db_exceptions
1233 1152
    def thresholded_events_clean(self):
1234 1153
        """
1235 1154
        This method is a convenience wrapper for underlying
......
1239 1158
        It will automatically commit transaction for successfull database operation
1240 1159
        and rollback the invalid one.
1241 1160
        """
1242
        try:
1243
            count = self.cursor.thresholded_events_clean()
1244
            self.commit()
1245
            return count
1246

  
1247
        except psycopg2.DatabaseError as err:
1248
            self.rollback()
1249
            raise EventStorageException(str(err)) from err
1161
        count = self.cursor.thresholded_events_clean()
1162
        self.commit()
1163
        return count
1250 1164

  
1251 1165
    #---------------------------------------------------------------------------
1252 1166

  
(1-1/4)