From a7319c561d034addb0f59952ea56a036dc8d119a Mon Sep 17 00:00:00 2001 From: "Dustin C. Hatch" Date: Wed, 17 May 2023 14:51:21 -0500 Subject: [PATCH] journal2ntfy: Script to send log messagess via ntfy The `journal2ntfy.py` script follows the systemd journal by spawning `journalctl` as a child process and reading from its standard output stream. Any command-line arguments passed to `journal2ntfy` are passed to `journalctl`, which allows the caller to specify message filters. For any matching journal message, `journal2ntfy` sends a message via the *ntfy* web service. For the BURP server, we're going to use `journal2ntfy` to generate alerts about the RAID array. When I reconnect the disk that was in the fireproof safe, the kernel will log a message from the *md* subsystem indicating that the resynchronization process has begun. Then, when the disks are again in sync, it will log another message, which will let me know it is safe to archive the other disk. --- group_vars/burp-server.yml | 2 + hosts | 3 + journal2ntfy.yml | 4 + roles/journal2ntfy/files/journal2ntfy.py | 169 ++++++++++++++++++ roles/journal2ntfy/files/journal2ntfy.service | 29 +++ roles/journal2ntfy/handlers/main.yml | 4 + roles/journal2ntfy/tasks/main.yml | 51 ++++++ .../templates/journal2ntfy.env.j2 | 3 + 8 files changed, 265 insertions(+) create mode 100644 journal2ntfy.yml create mode 100644 roles/journal2ntfy/files/journal2ntfy.py create mode 100644 roles/journal2ntfy/files/journal2ntfy.service create mode 100644 roles/journal2ntfy/handlers/main.yml create mode 100644 roles/journal2ntfy/tasks/main.yml create mode 100644 roles/journal2ntfy/templates/journal2ntfy.env.j2 diff --git a/group_vars/burp-server.yml b/group_vars/burp-server.yml index 54ec5d1..d515aa5 100644 --- a/group_vars/burp-server.yml +++ b/group_vars/burp-server.yml @@ -3,3 +3,5 @@ burp_notify: gyrfalcon@ebonfire.com collectd_processes: - name: burp + +journal2ntfy_filters: _TRANSPORT=kernel --grep ^md diff --git a/hosts b/hosts index 9665bbf..ca6abba 100644 --- a/hosts +++ b/hosts @@ -74,6 +74,9 @@ hass2.pyrocufflink.blue [jenkins-slave] +[journal2ntfy:children] +burp-server + [k8s-controller] k8s-ctrl0.pyrocufflink.blue diff --git a/journal2ntfy.yml b/journal2ntfy.yml new file mode 100644 index 0000000..e3af3aa --- /dev/null +++ b/journal2ntfy.yml @@ -0,0 +1,4 @@ +- hosts: journal2ntfy + roles: + - role: journal2ntfy + tags: journal2ntfy diff --git a/roles/journal2ntfy/files/journal2ntfy.py b/roles/journal2ntfy/files/journal2ntfy.py new file mode 100644 index 0000000..e973a5a --- /dev/null +++ b/roles/journal2ntfy/files/journal2ntfy.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +import asyncio +import datetime +import json +import logging +import os +import shlex +import signal +import sys +import syslog +import urllib.error +import urllib.request +from typing import Any, AsyncIterator + + +log = logging.getLogger('journal2ntfy') + + +Json = dict[str, Any] + + +NTFY_URL = os.environ.get('NTFY_URL', 'https://ntfy.pyrocufflink.blue') +NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'alerts') + + +ntfy_lock = asyncio.Lock() + + +async def follow_journal(*filters: str) -> AsyncIterator[Json]: + cmd = [ + 'journalctl', + '--since=now', + '--output=json', + '--follow', + ] + cmd += filters + if log.isEnabledFor(logging.DEBUG): + log.debug( + 'Running command: %s', ' '.join(shlex.quote(str(a)) for a in cmd) + ) + p = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + assert p.stdout + try: + async for line in p.stdout: + try: + data = json.loads(line) + except ValueError as e: + log.error('Failed to parse journal entry: %s', e) + continue + try: + yield data + except Exception: + log.exception('Error handling message:') + continue + except asyncio.CancelledError: + log.debug('Terminating child process') + p.terminate() + try: + await asyncio.wait_for(p.wait(), 10) + except asyncio.TimeoutError: + log.warning('Child process did not terminate when asked, killing') + p.kill() + await p.wait() + raise + + +async def on_message(data: Json) -> None: + message = data.get('MESSAGE') + if not message: + log.debug('Ignoring journal record with empty message') + return + + try: + timestamp = datetime.datetime.fromtimestamp( + int(data['__REALTIME_TIMESTAMP']) / 1e6 + ) + except (KeyError, ValueError) as e: + log.warning('Could not parse message timestamp: %s', e) + timestamp = datetime.datetime.now() + + try: + hostname = data['_HOSTNAME'] + except KeyError: + hostname = os.uname().nodename + + try: + ident = data['SYSLOG_IDENTIFIER'] + except KeyError: + ident = data.get('_TRANSPORT', '') + + message = ( + f'{timestamp.strftime("%b %d %H:%M:%S")} {hostname} {ident}: ' + f'{message}' + ) + + try: + priority = int(data['PRIORITY']) + except (KeyError, ValueError): + priority = 0 + if priority <= syslog.LOG_CRIT: + tag = 'rotating_light' + elif priority == syslog.LOG_ERR: + tag = 'red_circle' + elif priority == syslog.LOG_WARNING: + tag = 'warning' + elif priority == syslog.LOG_NOTICE: + tag = 'pushpin' + elif priority == syslog.LOG_INFO: + tag = 'information_source' + else: + tag = 'white_circle' + + req = urllib.request.Request( + f'{NTFY_URL}/{NTFY_TOPIC}', + data=message.encode('utf-8'), + headers={ + 'Tags': tag, + 'Content-Type': 'text/plain; charset=utf-8', + }, + ) + try: + # The lock ensures that messages are delivered in the order + # they were read from the journal, while `to_thread` keeps the + # network request from blocking the event loop. + async with ntfy_lock: + res = await asyncio.to_thread(urllib.request.urlopen, req) + except urllib.error.URLError as e: + log.error('Could not send ntfy notification: %s', e) + return + if res.status == 200: + log.debug('Successfully sent ntfy notification') + else: + log.error('Sending ntfy notification failed: %s', res.reason) + + +def shutdown(signum: int) -> None: + log.info('Received %s, shutting down', signal.Signals(signum).name) + cur_task = asyncio.current_task() + for task in asyncio.all_tasks(): + if task is not cur_task: + task.cancel() + + +async def main(): + logging.basicConfig(level=logging.DEBUG) + loop = asyncio.get_running_loop() + loop.add_signal_handler(signal.SIGTERM, shutdown, signal.SIGTERM) + loop.add_signal_handler(signal.SIGINT, shutdown, signal.SIGINT) + + if len(sys.argv) > 1: + filters = sys.argv[1:] + else: + filters = shlex.split(os.environ.get('JOURNAL2NTFY_FILTERS', '')) + + try: + async for data in follow_journal(*filters): + await on_message(data) + except asyncio.CancelledError: + return + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/roles/journal2ntfy/files/journal2ntfy.service b/roles/journal2ntfy/files/journal2ntfy.service new file mode 100644 index 0000000..a0f9511 --- /dev/null +++ b/roles/journal2ntfy/files/journal2ntfy.service @@ -0,0 +1,29 @@ +[Unit] +Description=Send kernel messages from md via ntfy +Wants=network-online.target +After=network-online.target + +[Service] +Type=exec +EnvironmentFile=-/etc/sysconfig/journal2ntfy +ExecStart=/usr/local/bin/journal2ntfy +DevicePolicy=closed +MemoryDenyWriteExecute=yes +PrivateDevices=yes +PrivateTmp=yes +PrivateUsers=yes +ProtectClock=yes +ProtectHome=yes +ProtectKernelLogs=yes +ProtectKernelModules=yes +ProtectKernelTunables=yes +ProtectProc=invisible +ProtectSystem=strict +RestrictRealtime=yes +RestrictSUIDSGID=yes +SystemCallFilter=@system-service +SystemCallFilter=~@privileged @resources +UMask=0077 + +[Install] +WantedBy=multi-user.target diff --git a/roles/journal2ntfy/handlers/main.yml b/roles/journal2ntfy/handlers/main.yml new file mode 100644 index 0000000..0652988 --- /dev/null +++ b/roles/journal2ntfy/handlers/main.yml @@ -0,0 +1,4 @@ +- name: restart journal2ntfy + service: + name: journal2ntfy + state: restarted diff --git a/roles/journal2ntfy/tasks/main.yml b/roles/journal2ntfy/tasks/main.yml new file mode 100644 index 0000000..a897362 --- /dev/null +++ b/roles/journal2ntfy/tasks/main.yml @@ -0,0 +1,51 @@ +- name: ensure journal2ntfy script is installed + copy: + src: journal2ntfy.py + dest: /usr/local/bin/journal2ntfy + owner: root + group: root + mode: u=rwx,go=rx + notify: + - restart journal2ntfy + tags: + - install + +- name: ensure journal2ntfy.service systemd unit is installed + copy: + src: journal2ntfy.service + dest: /etc/systemd/system/journal2ntfy.service + owner: root + group: root + mode: u=rw,go=r + notify: + - restart journal2ntfy + tags: + - systemd + +- name: ensure journal2ntfy is configured + template: + src: journal2ntfy.env.j2 + dest: /etc/sysconfig/journal2ntfy + owner: root + group: root + mode: u=rw,go=r + notify: + - restart journal2ntfy + tags: + - config + +- name: ensure journal2ntfy service is enabled + service: + name: journal2ntfy + enabled: true + tags: + - service + +- meta: flush_handlers + +- name: ensure journal2ntfy is running + service: + name: journal2ntfy + state: started + tags: + - service diff --git a/roles/journal2ntfy/templates/journal2ntfy.env.j2 b/roles/journal2ntfy/templates/journal2ntfy.env.j2 new file mode 100644 index 0000000..0c07e4b --- /dev/null +++ b/roles/journal2ntfy/templates/journal2ntfy.env.j2 @@ -0,0 +1,3 @@ +{% if journal2ntfy_filters|d(none) %} +JOURNAL2NTFY_FILTERS={{ journal2ntfy_filters }} +{% endif %}