import logging import random import os import signal import sys import typing import urllib import pathlib import flask import markdown import markupsafe import yaml import tabulate import article import db_handler import telegram_handler class CustomFormatter(logging.Formatter): grey = "\x1b[90;20m" blue = "\x1b[34;20m" yellow = "\x1b[33;20m" red = "\x1b[31;20m" bold_red = "\x1b[31;1m" reset = "\x1b[0m" format = "%(asctime)s,%(msecs)03d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s" FORMATS = { logging.DEBUG: grey + format + reset, logging.INFO: blue + format + reset, logging.WARNING: yellow + format + reset, logging.ERROR: red + format + reset, logging.CRITICAL: bold_red + format + reset } def format(self, record): log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) logger = logging.getLogger(__name__) debug = 'PROD' not in os.environ logger.setLevel(logging.DEBUG if debug else logging.WARNING) logging.getLogger('werkzeug').setLevel(logging.DEBUG if debug else logging.WARNING) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(CustomFormatter()) logger.root.addHandler(stream_handler) logger.info(f"Debug: {'en' if debug else 'dis'}abled") content_root_path = os.environ["CONTENT_ROOT_PATH"] if "CONTENT_ROOT_PATH" in os.environ else './blog.node5.net/' article_generator = article.ArticleGenerator(os.path.join(content_root_path, 'articles')) def get_articles() -> article.Folder: try: articles = article_generator.discover_articles() return articles except article.ArticleHandlerException as exception: # Known exceptions logger.error(exception) exit(1) # Exit code 1: Code for generic error if len(sys.argv) > 1 and sys.argv[1] == 'validate': get_articles() logger.info('OK') exit(0) template_path = os.path.join(os.path.abspath(content_root_path), 'templates') logger.debug(f"template path: {template_path}") app = flask.Flask(__name__, template_folder=template_path, static_folder=os.path.join(os.path.abspath(content_root_path), 'static'), static_url_path='') with open(os.path.join(content_root_path, 'motd.md'), 'r') as file: motd_list = [markupsafe.Markup(markdown.markdown(line)) for line in file.readlines()] folders_by_url: typing.Dict[str, article.Folder] = {} articles_by_url: typing.Dict[str, article.Article] = {} # DB init db: db_handler.DBHandler = db_handler.DBHandler() # Telegram init try: with open(os.path.join('configs', 'telegram.yml'), 'r') as file: telegram_conf = yaml.safe_load(file.read()) telegram = telegram_handler.Telegram(telegram_conf) except FileNotFoundError as ex: logger.warning("Telegram config - Not found, running without") @app.context_processor # Always inject site title to all render_templates def inject_common(): url = strip_trailing_slash(flask.request.path) comments, comment_headers, comments_sql = db.get_comments(url) if comments: comments_formatted = tabulate.tabulate(comments, comment_headers, tablefmt="psql") else: comments_formatted = "" insert_sql_pretty = markdown.markdown(f"```sql\nINSERT INTO comment(nickname, visitor_url, contact, show_contact, comment, public) VALUES (\n```", extensions=['fenced_code','codehilite']) sql_pretty = markdown.markdown(f"```sql{ comments_sql }```", extensions=['fenced_code','codehilite']) args = { 'insert_sql_pretty': insert_sql_pretty, 'comments_sql': markupsafe.Markup(markdown.markdown(sql_pretty)), 'comments': comments_formatted, 'title': pathlib.Path(content_root_path).name, 'motd': random.choice(motd_list) } return args def strip_trailing_slash(path): if path != '/': path = path if not path[len(path) - 1] == '/' else path[:-1] return path @app.route('/motd') def motd_debug(): return ''+'\n'.join(motd_list) @app.route('/comment', methods=['POST']) def post_comment(): args = {'page_url': strip_trailing_slash(urllib.parse.unquote(urllib.parse.urlparse(flask.request.referrer).path)), 'public': False, 'show_contact': False} for key, value in flask.request.form.items(): if key in ('nickname', 'visitor_url', 'contact', 'show_contact', 'public', 'comment'): if key in ('public', 'show_contact'): value = value == 'on' args[key] = value # Notify new message, share details if user indicated it's okay notification_message = f"*New comment on blog*" if args.get('public'): notification_message += f"\n{args['comment']}" for key in ('nickname', 'visitor_url', 'contact'): if args.get(key): if key == 'contact' and not args['show_contact']: continue # Skip contact, if not public for privacy notification_message += f'\n{key[0].upper()}{key[1:]}: {args.get(key)}' telegram.send_message(notification_message) db.post_comment(**args) return flask.redirect(flask.request.referrer) def view_folder(): path = flask.request.path if path != '/': path = strip_trailing_slash(flask.request.path) folder = folders_by_url[path] return flask.render_template('folder.html', folder=folder) def view_article(): path = strip_trailing_slash(flask.request.path) article = articles_by_url[path] return flask.render_template('article.html', article=article) def register_urls(folder: article.Folder): # Use recursion to traverse folder tree structure if strip_trailing_slash(folder.url) not in folders_by_url.keys(): # If statement is because it can be called multiple times, which would cause an error app.add_url_rule(folder.url, view_func=view_folder) folders_by_url[strip_trailing_slash(folder.url) if folder.url != '/' else folder.url] = folder for article in folder.articles: if article.url not in articles_by_url.keys(): # If statement is because it can be called multiple times, which would cause an error logger.debug(f"Registering url: {article.url}, static path: {article.folder_path}") blueprint_args = {} # Some articles are one file, and don't have a folder, hence argument would give an error if article.folder_path: blueprint_args['static_folder'] = os.path.abspath(article.folder_path) blueprint_args['static_url_path'] = '' blueprint = flask.Blueprint(article.name, __name__, url_prefix=article.url, **blueprint_args) blueprint.add_url_rule('/', view_func=view_article) app.register_blueprint(blueprint) # app.add_url_rule(article.url, view_func=view_article) articles_by_url[article.url] = article for sub_folder in folder.sub_folders.values(): register_urls(sub_folder) def load_articles(): logger.info('Loading articles') articles = get_articles() register_urls(articles) load_articles() def signal_handler(signum, frame): logger.debug(f"Signal Number: {signum}, Frame {frame}") load_articles() signal.signal(signal.SIGHUP, signal_handler) def main(): global app app.run() if __name__ == '__main__': main()