import os import psycopg import yaml with open(os.path.join('configs', 'database.yml'), 'r') as file: db_con_params = yaml.safe_load(file.read()) def get_latest_login_attempts() -> (list[dict], list[str]): with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn: with conn.cursor() as cur: cur.execute(""" SELECT login_attempt.id, username, password, ip, login_attempt.timestamp FROM login_attempt JOIN connection on connection.id = login_attempt.connection ORDER BY login_attempt.id desc limit 20; """) login_attempts = cur.fetchall() col_names = [desc[0] for desc in cur.description] return login_attempts, col_names def get_top(column: str) -> (list[dict], list[str]): if column not in ['username', 'password']: raise ValueError(f'{column} is not allowed') with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn: with conn.cursor() as cur: cur.execute(psycopg.sql.SQL(""" SELECT {column}, COUNT({column}) FROM login_attempt GROUP BY {column} ORDER BY COUNT({column}) DESC LIMIT 20; """).format(column=psycopg.sql.Identifier(column), )) top_usernames = cur.fetchall() col_names = [desc[0] for desc in cur.description] return top_usernames, col_names def get_password_of_the_month() -> str: with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn: with conn.cursor() as cur: cur.execute(""" SELECT password FROM login_attempt WHERE timestamp BETWEEN current_timestamp - interval '1 month' AND timestamp GROUP BY password ORDER BY COUNT(password) DESC LIMIT 1; """) password = cur.fetchone()['password'] return password def get_histogram_detailed() -> str: with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn: with conn.cursor() as cur: cur.execute(""" SELECT count(la.id) as total_count, date_trunc('hour', la.timestamp) as date, cn.ip FROM login_attempt la JOIN connection cn on cn.id = la.connection GROUP BY date_trunc('hour', la.timestamp), cn.ip ORDER BY date_trunc('hour', la.timestamp) DESC LIMIT 48; ; """) histogram = cur.fetchall() return histogram def get_histogram_simple() -> str: with psycopg.connect(**db_con_params, row_factory=psycopg.rows.dict_row) as conn: with conn.cursor() as cur: cur.execute(""" SELECT count(id) as total_count, date_trunc('hour', timestamp) as date FROM login_attempt GROUP BY date_trunc('hour', timestamp) ORDER BY date_trunc('hour', timestamp) LIMIT 48 ; """) histogram = cur.fetchall() return histogram