Project

General

Profile

Feature #4277 » ip_aggr_test.py

Radko Krkoš, 08/09/2019 10:03 AM

 
1
#!/usr/bin/env python3
2
"""Mentat IP aggregation correctness tester"""
3

    
4
import timeit
5
import random
6
import ipaddress
7
import sys
8
import psycopg2
9
from psycopg2 import sql
10

    
11
DBNAME = 'mentat_events'
12
BITS = 32
13

    
14
def main():
15
    start = timeit.default_timer()
16
    count = 0
17
    con = psycopg2.connect(dbname=DBNAME)
18
    cursor = con.cursor()
19
    raw_query = "SELECT id FROM (SELECT id FROM events WHERE %s && ANY({srctgt}) UNION ALL SELECT id FROM events WHERE ({aggr} && %s AND %s && ANY({srctgt}))) AS sub GROUP BY id HAVING count(*) = 1"
20
    try:
21
        while True:
22
            count = count + 1
23
            ip = str(ipaddress.ip_address(random.getrandbits(BITS)))
24
            for srctgt, aggr in [('source_ip', 'source_ip_aggr_ip4'), ('target_ip', 'target_ip_aggr_ip4')]:
25
                query = sql.SQL(raw_query).format(srctgt=sql.Identifier(srctgt), aggr=sql.Identifier(aggr))
26
                cursor.execute(query, (ip, ip, ip))
27
                ids = cursor.fetchall()
28
                con.rollback()
29
                if ids:
30
                    print('\nIP: %s (%s) mismatch in %s' % (ip, srctgt, [eid for eidt in ids for eid in eidt]))
31
                    raise EnvironmentError
32
            print('.', end='')
33
            sys.stdout.flush()
34
    except (KeyboardInterrupt, EnvironmentError):
35
        pass
36
    finally:
37
        print('\nTotal tested %d addresses in %fs' % (count, timeit.default_timer() - start))
38

    
39

    
40
if __name__ == '__main__':
41
    main()
    (1-1/1)