1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import os
import psycopg
import yaml
with open(os.path.join('configs', 'database.yml'), 'r') as file:
db_con_params = yaml.safe_load(file.read())
def get_latest_login_attempts() -> (list[dict], list[str]):
with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT login_attempt.id, username, password, ip, login_attempt.timestamp
FROM login_attempt
JOIN connection on connection.id = login_attempt.connection
ORDER BY login_attempt.id desc limit 20;
""")
login_attempts = cur.fetchall()
col_names = [desc[0] for desc in cur.description]
return login_attempts, col_names
def get_top(column: str) -> (list[dict], list[str]):
if column not in ['username', 'password']:
raise ValueError(f'{column} is not allowed')
with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn:
with conn.cursor() as cur:
cur.execute(psycopg.sql.SQL("""
SELECT {column}, COUNT({column})
FROM login_attempt
GROUP BY {column}
ORDER BY COUNT({column}) DESC
LIMIT 20;
""").format(column=psycopg.sql.Identifier(column),))
top_usernames = cur.fetchall()
col_names = [desc[0] for desc in cur.description]
return top_usernames, col_names
|