Feature #4277 » ip_aggr_test.py
1 |
#!/usr/bin/env python3
|
---|---|
2 |
"""Mentat IP aggregation correctness tester"""
|
3 |
|
4 |
import timeit |
5 |
import random |
6 |
import ipaddress |
7 |
import sys |
8 |
import psycopg2 |
9 |
from psycopg2 import sql |
10 |
|
11 |
DBNAME = 'mentat_events' |
12 |
BITS = 32 |
13 |
|
14 |
def main(): |
15 |
start = timeit.default_timer() |
16 |
count = 0 |
17 |
con = psycopg2.connect(dbname=DBNAME) |
18 |
cursor = con.cursor() |
19 |
raw_query = "SELECT id FROM (SELECT id FROM events WHERE %s && ANY({srctgt}) UNION ALL SELECT id FROM events WHERE ({aggr} && %s AND %s && ANY({srctgt}))) AS sub GROUP BY id HAVING count(*) = 1" |
20 |
try: |
21 |
while True: |
22 |
count = count + 1 |
23 |
ip = str(ipaddress.ip_address(random.getrandbits(BITS))) |
24 |
for srctgt, aggr in [('source_ip', 'source_ip_aggr_ip4'), ('target_ip', 'target_ip_aggr_ip4')]: |
25 |
query = sql.SQL(raw_query).format(srctgt=sql.Identifier(srctgt), aggr=sql.Identifier(aggr)) |
26 |
cursor.execute(query, (ip, ip, ip)) |
27 |
ids = cursor.fetchall() |
28 |
con.rollback() |
29 |
if ids: |
30 |
print('\nIP: %s (%s) mismatch in %s' % (ip, srctgt, [eid for eidt in ids for eid in eidt])) |
31 |
raise EnvironmentError |
32 |
print('.', end='') |
33 |
sys.stdout.flush() |
34 |
except (KeyboardInterrupt, EnvironmentError): |
35 |
pass
|
36 |
finally: |
37 |
print('\nTotal tested %d addresses in %fs' % (count, timeit.default_timer() - start)) |
38 |
|
39 |
|
40 |
if __name__ == '__main__': |
41 |
main() |