summaryrefslogtreecommitdiff
path: root/src/server_status.py
blob: 25670a9a5b127eb5fbfe1d5ad5fbd3852825fc80 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import telegram_notifier  # Send notifications
import requests  # Check HTTP status code
import os  # Unix socket file handling
import subprocess  # Ping check if host online
import socket  # Hostname, unix socket
import yaml  # Config file
import logging  # Log messages
import signal  # Notify before exiting


class ColorFormatter(logging.Formatter):
    grey = "\x1b[90;20m"
    cyan = "\x1b[96;20m"
    yellow = "\x1b[33;20m"
    red = "\x1b[31;20m"
    bold_red = "\x1b[31;1m"
    reset = "\x1b[0m"
    format = "%(name)s %(asctime)s,%(msecs)03d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s"

    FORMATS = {
        logging.DEBUG: grey + format + reset,
        logging.INFO: cyan + format + reset,
        logging.WARNING: yellow + format + reset,
        logging.ERROR: red + format + reset,
        logging.CRITICAL: bold_red + format + reset
    }

    def format(self, record):
        log_fmt = self.FORMATS.get(record.levelno)
        formatter = logging.Formatter(log_fmt)
        return formatter.format(record)


logger = logging.getLogger(__name__)  # Instantiate a logger to be used in this module

# Display every message Change this to INFO to see INFO and above (filter out DEBUG)
# See: https://docs.python.org/3/howto/logging.html#logging-levels
logger.root.setLevel(logging.DEBUG)

stream_handler = logging.StreamHandler()  # This catches and handles log messages on the root handler
stream_handler.setFormatter(ColorFormatter())
logger.root.addHandler(stream_handler)

# Disable logging for sub libraries
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("telegram").setLevel(logging.WARNING)

with open("config.yml", "r") as file:
    config = yaml.safe_load(file)

notifier = telegram_notifier.TelegramNotifier(config['telegram']['token'], config['telegram']['chat_id'])

# Set the path for the Unix socket
# without reinitialising entire script
# Used for SystemD timer to invoke status check, 
socket_path = "/tmp/server_status.sock"
hostname = socket.gethostname()
# Used to only notify on state change
# Initialise data structure based on targets in config file
state = {}
for host, host_config in config["hosts"].items():
    state[host] = {"online": None, "open_ports": []}
    if 'urls' in host_config:
        state[host]["http_status"] = {}
        for url in host_config['urls']:
            state[host]['http_status'][url] = None


def ping(host: str) -> bool:
    response = subprocess.run(["ping", "-c", "1", host])
    return response.returncode == 0


def check_target(host: str, host_config: dict) -> bool:
    """
    :param host: Hostname or IP
    :param host_config: Config from config file, specifying what to test and desired state
    :return: Host is desired state
    """
    # Ping
    online = ping(host)
    # Default to desire host online, if not specified in config file
    desired_online = host_config['desired_online'] if 'desired_online' in host_config else True
    host_is_desired_state = desired_online == online
    first_test = state[host]['online'] == None
    changed_state = not first_test and state[host]['online'] != online
    logger.debug(f'''
    host_is_desired_state: {host_is_desired_state}
    first_test:            {first_test}
    changed_state:         {changed_state}
    ''')
    # notify if the host online state changed, or on fresh boot, if it isn't the desired online state
    if changed_state or (first_test and not host_is_desired_state):
        message = f"❌ `{host}` is {'online' if online else 'offline'}"
        logger.warning(message)
        notifier.send_message(message)
    state[host]['online'] = online  # Save current state

    if online:
        messages = []
        for url in host_config['urls']:
            http_state = None
            # HTTP Code
            try:
                # print(f'''
                # ---=== Getting: {url} ===---
                # ''')
                r = requests.get(url)
                if r.status_code != 200:
                    http_state = r.status_code
            except requests.exceptions.SSLError as exception:
                try:
                    reason = exception.args[0].reason.args[0].verify_message
                except Exception as exception:
                    logger.warning("Unable to get reason")
                    raise
                http_state = "SSL error"
            except Exception as exception:
                raise
            if http_state:
                if state[host]['http_status'][url] is None or state[host]['http_status'][url] != http_state:
                    messages.append(f'[{url}]({url}) {http_state}')
                    state[host]['http_status'][url] = http_state

        if len(messages) > 0:
            notifier.send_message('❌ ' + '\n'.join(messages))

    return host_is_desired_state


def check_status() -> bool:
    """
    :return: All hosts are in desired state
    """
    try:
        logger.info("Checking status")
        all_hosts_are_in_desired_state = True
        for host, host_config in config["hosts"].items():
            host_is_desired_state = check_target(host, host_config)
            if host_is_desired_state == False:
                all_hosts_are_in_desired_state = False
        return all_hosts_are_in_desired_state
    except Exception as ex:
        notifier.send_message("Error getting status")
        raise ex


def listen():
    # remove the socket file if it already exists
    try:
        os.unlink(socket_path)
    except OSError:
        if os.path.exists(socket_path):
            raise

    try:
        # Create the Unix socket server
        server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

        # Bind the socket to the path
        server.bind(socket_path)

        # Listen for incoming connections
        server.listen(1)

        while True:
            try:
                # accept connections
                logger.info("Server is listening for incoming connections...")
                connection, client_address = server.accept()

                logger.debug("Connection from", str(connection).split(", ")[0][-4:])

                # receive data from the client
                while True:
                    data = connection.recv(1024)
                    if not data:
                        break
                    logger.debug("Received data:", data.decode())

                    # Send a response back to the client
                    response = "OK\n"
                    connection.sendall(response.encode())

                    check_status()
            finally:
                # Always close the connection if it"s been initialised
                if "connection" in locals():
                    connection.close()
    finally:
        # remove the socket file
        os.unlink(socket_path)


def main():
    notifier.send_message(f"✅ Server status checker on `{hostname}` *online*")
    alles_gut = check_status()
    if alles_gut == True:
        # First test, notify of outcome
        # If a non-desired state was caught in first check a notification would have been triggered.
        notifier.send_message("✅ All hosts are in desired state,\nand no discrepancies found")  # Vægter
    listen()


def service_exiting(signum, frame):
    # Ship going down
    notifier.send_message(f"❌ Server status checker on `{hostname}` *going down*")
    exit(0)


# Intercept SIGTERM (15) and notify before exiting
signal.signal(signal.SIGTERM, service_exiting)

if __name__ == "__main__":
    main()