import telegram_notifier # Send notifications import requests # Check HTTP status code import os # Unix socket file handling import subprocess # Ping check if host online import socket # Hostname, unix socket import yaml # Config file import logging # Log messages import signal # Notify before exiting class ColorFormatter(logging.Formatter): grey = "\x1b[90;20m" cyan = "\x1b[96;20m" yellow = "\x1b[33;20m" red = "\x1b[31;20m" bold_red = "\x1b[31;1m" reset = "\x1b[0m" format = "%(name)s %(asctime)s,%(msecs)03d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s" FORMATS = { logging.DEBUG: grey + format + reset, logging.INFO: cyan + format + reset, logging.WARNING: yellow + format + reset, logging.ERROR: red + format + reset, logging.CRITICAL: bold_red + format + reset } def format(self, record): log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) logger = logging.getLogger(__name__) # Instantiate a logger to be used in this module # Display every message Change this to INFO to see INFO and above (filter out DEBUG) # See: https://docs.python.org/3/howto/logging.html#logging-levels logger.root.setLevel(logging.DEBUG) stream_handler = logging.StreamHandler() # This catches and handles log messages on the root handler stream_handler.setFormatter(ColorFormatter()) logger.root.addHandler(stream_handler) # Disable logging for sub libraries logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("asyncio").setLevel(logging.WARNING) logging.getLogger("telegram").setLevel(logging.WARNING) with open("config.yml", "r") as file: config = yaml.safe_load(file) notifier = telegram_notifier.TelegramNotifier(config['telegram']['token'], config['telegram']['chat_id']) # Set the path for the Unix socket # without reinitialising entire script # Used for SystemD timer to invoke status check, socket_path = "/tmp/server_status.sock" hostname = socket.gethostname() # Used to only notify on state change # Initialise data structure based on targets in config file state = {} for host, host_config in config["hosts"].items(): state[host] = {"online": None, "open_ports": []} if 'urls' in host_config: state[host]["http_status"] = {} for url in host_config['urls']: state[host]['http_status'][url] = None def ping(host: str) -> bool: response = subprocess.run(["ping", "-c", "1", host]) return response.returncode == 0 def check_target(host: str, host_config: dict) -> bool: """ :param host: Hostname or IP :param host_config: Config from config file, specifying what to test and desired state :return: Host is desired state """ # Ping online = ping(host) # Default to desire host online, if not specified in config file desired_online = host_config['desired_online'] if 'desired_online' in host_config else True host_is_desired_state = desired_online == online first_test = state[host]['online'] == None changed_state = not first_test and state[host]['online'] != online logger.debug(f''' host_is_desired_state: {host_is_desired_state} first_test: {first_test} changed_state: {changed_state} ''') # notify if the host online state changed, or on fresh boot, if it isn't the desired online state if changed_state or (first_test and not host_is_desired_state): message = f"❌ `{host}` is {'online' if online else 'offline'}" logger.warning(message) notifier.send_message(message) state[host]['online'] = online # Save current state if online: messages = [] for url in host_config['urls']: http_state = None # HTTP Code try: # print(f''' # ---=== Getting: {url} ===--- # ''') r = requests.get(url) if r.status_code != 200: http_state = r.status_code except requests.exceptions.SSLError as exception: try: reason = exception.args[0].reason.args[0].verify_message except Exception as exception: logger.warning("Unable to get reason") raise http_state = "SSL error" except Exception as exception: raise if http_state: if state[host]['http_status'][url] is None or state[host]['http_status'][url] != http_state: messages.append(f'[{url}]({url}) {http_state}') state[host]['http_status'][url] = http_state if len(messages) > 0: notifier.send_message('❌ ' + '\n'.join(messages)) return host_is_desired_state def check_status() -> bool: """ :return: All hosts are in desired state """ try: logger.info("Checking status") all_hosts_are_in_desired_state = True for host, host_config in config["hosts"].items(): host_is_desired_state = check_target(host, host_config) if host_is_desired_state == False: all_hosts_are_in_desired_state = False return all_hosts_are_in_desired_state except Exception as ex: notifier.send_message("Error getting status") raise ex def listen(): # remove the socket file if it already exists try: os.unlink(socket_path) except OSError: if os.path.exists(socket_path): raise try: # Create the Unix socket server server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # Bind the socket to the path server.bind(socket_path) # Listen for incoming connections server.listen(1) while True: try: # accept connections logger.info("Server is listening for incoming connections...") connection, client_address = server.accept() logger.debug("Connection from", str(connection).split(", ")[0][-4:]) # receive data from the client while True: data = connection.recv(1024) if not data: break logger.debug("Received data:", data.decode()) # Send a response back to the client response = "OK\n" connection.sendall(response.encode()) check_status() finally: # Always close the connection if it"s been initialised if "connection" in locals(): connection.close() finally: # remove the socket file os.unlink(socket_path) def main(): notifier.send_message(f"✅ Server status checker on `{hostname}` *online*") alles_gut = check_status() if alles_gut == True: # First test, notify of outcome # If a non-desired state was caught in first check a notification would have been triggered. notifier.send_message("✅ All hosts are in desired state,\nand no discrepancies found") # Vægter listen() def service_exiting(signum, frame): # Ship going down notifier.send_message(f"❌ Server status checker on `{hostname}` *going down*") exit(0) # Intercept SIGTERM (15) and notify before exiting signal.signal(signal.SIGTERM, service_exiting) if __name__ == "__main__": main()