#!/opt/public_ssh_password_stats/log_ssh_passwords/venv/bin/python3 import socket import time import os import paramiko import db_handler import threading class SSHServer(paramiko.ServerInterface): def __init__(self, connection_id: int): self.connection_id = connection_id def check_auth_password(self, username, password): print(f"Username: {username}") print(f"Password: {password}") db_handler.log_login_attempt(username, password, self.connection_id) # Be a nice internet citizen, by slowing down bots a tad time.sleep(9) return paramiko.AUTH_FAILED def ssh_thread(client, address, port): print(f'New connection from: {address}:{port}') connection_id = db_handler.log_connection(address, port) # Create a new paramiko transport transport = paramiko.Transport(client) transport.add_server_key(host_key) server = SSHServer(connection_id) # Start the server transport.start_server(server=server) # Create an SSH server host_key = paramiko.RSAKey.from_private_key_file("id_rsa") # paramiko.RSAKey.generate(2048) if os.environ.get('DEBUG') == '1': # Local development server = '127.0.0.1' port = 2200 else: # Production server = '' port = 22 # Start the server server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((server, port)) server_socket.listen(100) print(f"[*] Listening for connections on {server}:{port}") while True: try: client, addr = server_socket.accept() thread = threading.Thread(target=ssh_thread, args=(client, addr[0], addr[1])) thread.start() except: pass