From f0bc558ae1cb2e7b8d5ab3a8a9dcdb8cc5d785ca Mon Sep 17 00:00:00 2001 From: Radko Krkos Date: Mon, 21 Jan 2019 20:04:42 +0100 Subject: [PATCH] DRAFT v3: Fix eternal 500 in Hawat after DB restart * Uses decorator for exception management in data access funtions and retries on DB connection loss (generally DB server restart). --- lib/mentat/services/eventstorage.py | 360 ++++++++++++++---------------------- 1 file changed, 137 insertions(+), 223 deletions(-) diff --git a/lib/mentat/services/eventstorage.py b/lib/mentat/services/eventstorage.py index 774ec74..1687044 100644 --- a/lib/mentat/services/eventstorage.py +++ b/lib/mentat/services/eventstorage.py @@ -106,6 +106,12 @@ class StorageIntegrityError(EventStorageException): """ +class StorageConnectionException(EventStorageException): + """ + Class for custom event storage exceptions related to database connection errors. + """ + + class DataError(EventStorageException): """ Class for custom event storage exceptions related to data errors. @@ -682,7 +688,9 @@ class EventStorageService: :param conncfg: Connection arguments. """ conncfg['cursor_factory'] = psycopg2.extras.NamedTupleCursor - self.connection = psycopg2.connect(**conncfg) + if not hasattr(self, "dsn"): + self.dsn = conncfg + self.connection = psycopg2.connect(**self.dsn) self.cursor = None self.cursor_new() @@ -729,6 +737,38 @@ class EventStorageService: self.cursor = EventStorageCursor(self.connection.cursor()) return self.cursor + def handle_db_exceptions(func): + """ + Handle exceptions raised during database interfacing operations. + """ + def exc_handle_wrapper(self, *args, **kwargs): + exc_store = None + for _ in range(2): + try: + return func(self, *args, **kwargs) + + except psycopg2.DataError as err: + self.rollback() + raise DataError(str(err)) from err + + except psycopg2.OperationalError as err: + self.__init__() + exc_store = err + continue + + except psycopg2.IntegrityError as err: + self.rollback() + raise StorageIntegrityError(str(err)) from err + + except psycopg2.DatabaseError as err: + self.rollback() + raise EventStorageException(str(err)) from err + + raise EventStorageException("DB connection error during data access") from exc_store + + return exc_handle_wrapper + + @handle_db_exceptions def database_create(self): """ Create database SQL schema. @@ -752,15 +792,11 @@ class EventStorageService: ) ) - try: - for query in create_table_sqls: - self.cursor.execute(query) - self.commit() - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + for query in create_table_sqls: + self.cursor.execute(query) + self.commit() + @handle_db_exceptions def index_create(self): """ Create default set of table indices. @@ -793,15 +829,11 @@ class EventStorageService: ) ) - try: - for query in create_index_sqls: - self.cursor.execute(query) - self.commit() - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + for query in create_index_sqls: + self.cursor.execute(query) + self.commit() + @handle_db_exceptions def database_drop(self): """ Drop database SQL schema. @@ -825,15 +857,11 @@ class EventStorageService: ) ) - try: - for query in drop_table_sqls: - self.cursor.execute(query) - self.commit() - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + for query in drop_table_sqls: + self.cursor.execute(query) + self.commit() + @handle_db_exceptions def index_drop(self): """ Drop default set of table indices. @@ -863,17 +891,13 @@ class EventStorageService: ) ) ) - try: - for query in drop_index_sqls: - self.cursor.execute(query) - self.commit() - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + for query in drop_index_sqls: + self.cursor.execute(query) + self.commit() #--------------------------------------------------------------------------- + @handle_db_exceptions def insert_event(self, idea_event): """ This method is a convenience wrapper for underlying @@ -883,18 +907,10 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - self.cursor.insert_event(idea_event) - self.commit() - - except psycopg2.IntegrityError as err: - self.rollback() - raise StorageIntegrityError(str(err)) from err - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + self.cursor.insert_event(idea_event) + self.commit() + @handle_db_exceptions def fetch_event(self, eventid): """ This method is a convenience wrapper for underlying @@ -904,15 +920,11 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - result = self.cursor.fetch_event(eventid) - self.commit() - return result - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + result = self.cursor.fetch_event(eventid) + self.commit() + return result + @handle_db_exceptions def delete_event(self, eventid): """ This method is a convenience wrapper for underlying @@ -922,14 +934,10 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - self.cursor.delete_event(eventid) - self.commit() - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + self.cursor.delete_event(eventid) + self.commit() + @handle_db_exceptions def count_events(self, parameters = None): """ This method is a convenience wrapper for underlying @@ -939,19 +947,11 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - result = self.cursor.count_events(parameters) - self.commit() - return result - - except psycopg2.DataError as err: - self.rollback() - raise DataError(str(err)) from err - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + result = self.cursor.count_events(parameters) + self.commit() + return result + @handle_db_exceptions def search_events(self, parameters = None, event_factory = record_to_idea, columns = EVENT_COLUMNS): """ This method is a convenience wrapper for underlying @@ -961,19 +961,11 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - count, result = self.cursor.search_events(parameters, event_factory, columns) - self.commit() - return count, result - - except psycopg2.DataError as err: - self.rollback() - raise DataError(str(err)) from err - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + count, result = self.cursor.search_events(parameters, event_factory, columns) + self.commit() + return count, result + @handle_db_exceptions def search_column_with(self, column, function = 'min'): """ This method is a convenience wrapper for underlying @@ -983,19 +975,11 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - result = self.cursor.search_column_with(column, function) - self.commit() - return result - - except psycopg2.DataError as err: - self.rollback() - raise DataError(str(err)) from err - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + result = self.cursor.search_column_with(column, function) + self.commit() + return result + @handle_db_exceptions def watchdog_events(self, interval): """ Perform watchdog operation on event database: Check if any new events were @@ -1005,20 +989,11 @@ class EventStorageService: :return: ``True`` in case any events were stored within given interval, ``False`` otherwise. :rtype: bool """ - try: - result = self.cursor.watchdog_events(interval) - self.commit() - return result - - except psycopg2.DataError as err: - self.rollback() - raise DataError(str(err)) from err - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err - + result = self.cursor.watchdog_events(interval) + self.commit() + return result + @handle_db_exceptions def delete_events(self, parameters = None): """ Delete IDEA messages in database according to given optional parameters. @@ -1028,15 +1003,11 @@ class EventStorageService: :return: Number of deleted events. :rtype: int """ - try: - count = self.cursor.delete_events(parameters) - self.commit() - return count - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + count = self.cursor.delete_events(parameters) + self.commit() + return count + @handle_db_exceptions def distinct_values(self, column): """ Return distinct values of given table column. @@ -1049,29 +1020,25 @@ class EventStorageService: :rtype: list """ enum_table = "enum_{}".format(column) - try: - # Build and execute query for updating enumeration table. - enum_query = psycopg2.sql.SQL("INSERT INTO {} (SELECT * FROM (").format(psycopg2.sql.Identifier(enum_table)) - if column not in ('cesnet_eventclass', 'cesnet_eventseverity'): - enum_query += psycopg2.sql.SQL("SELECT unnest({})").format(psycopg2.sql.Identifier(column)) - else: - enum_query += psycopg2.sql.SQL("SELECT {}").format(psycopg2.sql.Identifier(column)) - enum_query += psycopg2.sql.SQL(' AS data, max(cesnet_storagetime) AS last_seen FROM events WHERE cesnet_storagetime >= COALESCE((SELECT max(last_seen) FROM {}), (SELECT min(cesnet_storagetime) FROM events)) GROUP BY data) AS enum WHERE data IS NOT NULL) ON CONFLICT (data) DO UPDATE SET last_seen = excluded.last_seen').format(psycopg2.sql.Identifier(enum_table)) - self.cursor.execute(enum_query) - self.commit() - - # Return all entries from recetly updated enumeration table. - self.cursor.execute( - psycopg2.sql.SQL("SELECT data FROM {} ORDER BY data").format(psycopg2.sql.Identifier(enum_table)) - ) - result_raw = self.cursor.fetchall() - self.commit() - return [item[0] for item in result_raw if item[0] is not None] + # Build and execute query for updating enumeration table. + enum_query = psycopg2.sql.SQL("INSERT INTO {} (SELECT * FROM (").format(psycopg2.sql.Identifier(enum_table)) + if column not in ('cesnet_eventclass', 'cesnet_eventseverity'): + enum_query += psycopg2.sql.SQL("SELECT unnest({})").format(psycopg2.sql.Identifier(column)) + else: + enum_query += psycopg2.sql.SQL("SELECT {}").format(psycopg2.sql.Identifier(column)) + enum_query += psycopg2.sql.SQL(' AS data, max(cesnet_storagetime) AS last_seen FROM events WHERE cesnet_storagetime >= COALESCE((SELECT max(last_seen) FROM {}), (SELECT min(cesnet_storagetime) FROM events)) GROUP BY data) AS enum WHERE data IS NOT NULL) ON CONFLICT (data) DO UPDATE SET last_seen = excluded.last_seen').format(psycopg2.sql.Identifier(enum_table)) + self.cursor.execute(enum_query) + self.commit() - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + # Return all entries from recetly updated enumeration table. + self.cursor.execute( + psycopg2.sql.SQL("SELECT data FROM {} ORDER BY data").format(psycopg2.sql.Identifier(enum_table)) + ) + result_raw = self.cursor.fetchall() + self.commit() + return [item[0] for item in result_raw if item[0] is not None] + @handle_db_exceptions def table_cleanup(self, table, column, ttl): """ This method is a convenience wrapper for underlying @@ -1081,15 +1048,11 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - count = self.cursor.table_cleanup(table, column, ttl) - self.commit() - return count - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + count = self.cursor.table_cleanup(table, column, ttl) + self.commit() + return count + @handle_db_exceptions def threshold_set(self, key, thresholdtime, relapsetime, ttl): """ This method is a convenience wrapper for underlying @@ -1099,18 +1062,10 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - self.cursor.threshold_set(key, thresholdtime, relapsetime, ttl) - self.commit() - - except psycopg2.IntegrityError as err: - self.rollback() - raise StorageIntegrityError(str(err)) from err - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + self.cursor.threshold_set(key, thresholdtime, relapsetime, ttl) + self.commit() + @handle_db_exceptions def threshold_check(self, key, threshold): """ This method is a convenience wrapper for underlying @@ -1120,15 +1075,11 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - result = self.cursor.threshold_check(key, threshold) - self.commit() - return result - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + result = self.cursor.threshold_check(key, threshold) + self.commit() + return result + @handle_db_exceptions def threshold_save(self, eventid, keyid, group_name, severity, createtime): """ This method is a convenience wrapper for underlying @@ -1138,18 +1089,10 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - self.cursor.threshold_save(eventid, keyid, group_name, severity, createtime) - self.commit() - - except psycopg2.IntegrityError as err: - self.rollback() - raise StorageIntegrityError(str(err)) from err - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + self.cursor.threshold_save(eventid, keyid, group_name, severity, createtime) + self.commit() + @handle_db_exceptions def thresholds_count(self): """ This method is a convenience wrapper for underlying @@ -1159,19 +1102,11 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - result = self.cursor.thresholds_count() - self.commit() - return result - - except psycopg2.DataError as err: - self.rollback() - raise DataError(str(err)) from err - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + result = self.cursor.thresholds_count() + self.commit() + return result + @handle_db_exceptions def thresholds_clean(self, threshold): """ This method is a convenience wrapper for underlying @@ -1181,15 +1116,11 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - count = self.cursor.thresholds_clean(threshold) - self.commit() - return count - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + count = self.cursor.thresholds_clean(threshold) + self.commit() + return count + @handle_db_exceptions def search_relapsed_events(self, group_name, severity, ttl): """ This method is a convenience wrapper for underlying @@ -1199,15 +1130,11 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - events = self.cursor.search_relapsed_events(group_name, severity, ttl) - self.commit() - return events - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + events = self.cursor.search_relapsed_events(group_name, severity, ttl) + self.commit() + return events + @handle_db_exceptions def thresholded_events_count(self): """ This method is a convenience wrapper for underlying @@ -1217,19 +1144,11 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - result = self.cursor.thresholded_events_count() - self.commit() - return result - - except psycopg2.DataError as err: - self.rollback() - raise DataError(str(err)) from err - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + result = self.cursor.thresholded_events_count() + self.commit() + return result + @handle_db_exceptions def thresholded_events_clean(self): """ This method is a convenience wrapper for underlying @@ -1239,14 +1158,9 @@ class EventStorageService: It will automatically commit transaction for successfull database operation and rollback the invalid one. """ - try: - count = self.cursor.thresholded_events_clean() - self.commit() - return count - - except psycopg2.DatabaseError as err: - self.rollback() - raise EventStorageException(str(err)) from err + count = self.cursor.thresholded_events_clean() + self.commit() + return count #--------------------------------------------------------------------------- -- 2.14.5