summaryrefslogtreecommitdiff
path: root/src/db_handler.py
blob: 033b11aa13c1ffcbcd782fb07525b2d3be50934f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os

import psycopg
import yaml

with open(os.path.join('configs', 'database.yml'), 'r') as file:
    db_con_params = yaml.safe_load(file.read())


def get_latest_login_attempts() -> (list[dict], list[str]):
    with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT login_attempt.id, username, password, ip, login_attempt.timestamp 
                FROM login_attempt 
                JOIN connection on connection.id = login_attempt.connection 
                ORDER BY login_attempt.id desc limit 20;
                """)

            login_attempts = cur.fetchall()
            col_names = [desc[0] for desc in cur.description]
            return login_attempts, col_names


def get_top(column: str) -> (list[dict], list[str]):
    if column not in ['username', 'password']:
        raise ValueError(f'{column} is not allowed')
    with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn:
        with conn.cursor() as cur:
            cur.execute(psycopg.sql.SQL("""
                SELECT {column}, COUNT({column}) 
                FROM login_attempt 
                GROUP BY {column}
                ORDER BY COUNT({column}) DESC 
                LIMIT 20;
                """).format(column=psycopg.sql.Identifier(column), ))

            top_usernames = cur.fetchall()
            col_names = [desc[0] for desc in cur.description]
            return top_usernames, col_names


def get_password_of_the_month() -> str:
    with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn:
        with conn.cursor() as cur:
            cur.execute("""
SELECT password
FROM login_attempt
WHERE timestamp BETWEEN current_timestamp - interval '1 month' AND timestamp
GROUP BY password
ORDER BY COUNT(password) DESC
LIMIT 1;
                """)

            password = cur.fetchone()['password']
            return password


def get_histogram_detailed() -> str:
    with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn:
        with conn.cursor() as cur:
            cur.execute("""
SELECT count(la.id) as total_count, date_trunc('hour', la.timestamp) as date, cn.ip
FROM login_attempt la
JOIN connection cn on cn.id = la.connection
GROUP BY date_trunc('hour', la.timestamp), cn.ip
ORDER BY date_trunc('hour', la.timestamp) DESC
LIMIT 48;
;
                """)
            histogram = cur.fetchall()
            return histogram


def get_histogram_simple() -> str:
    with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn:
        with conn.cursor() as cur:
            cur.execute("""
SELECT count(id) as total_count, date_trunc('hour', timestamp) as date
FROM login_attempt
GROUP BY date_trunc('hour', timestamp)
ORDER BY date_trunc('hour', timestamp)
LIMIT 48
;
                """)
            histogram = cur.fetchall()
            return histogram