configpolicy/plugins/callback/ntfy.py

95 lines
3.0 KiB
Python

from pathlib import Path
import urllib.request
from ansible.errors import AnsibleError
from ansible.executor.stats import AggregateStats
from ansible.playbook import Playbook
from ansible.plugins.callback import CallbackBase
DOCUMENTATION = r'''
author: Dustin C. Hatch
name: ntfy
short_description: Send notifications to ntfy.sh
description:
- This plugin sends playbook failure notifications via ntfy.sh.
options:
server:
description: ntfy.sh server
type: str
default: https://ntfy.sh
env:
- name: NTFY_SERVER
ini:
- section: callback_ntfy
key: server
topic:
description: ntfy.sh topic name
type: str
default: ansible
env:
- name: NTFY_TOPIC
ini:
- section: callback_ntfy
key: topic
'''
class CallbackModule(CallbackBase):
CALLBACK_VERSION = 1.0
CALLBACK_TYPE = 'notification'
CALLBACK_NAME = 'ntfy'
def __init__(self):
super().__init__()
self.playbook = None
def set_options(self, task_keys=None, var_options=None, direct=None):
super().set_options(task_keys, var_options, direct)
ntfy_server = self.get_option('server').rstrip('/')
if '://' not in ntfy_server:
ntfy_server = f'http://{ntfy_server}'
ntfy_topic = self.get_option('topic')
self.ntfy_url = f'{ntfy_server}/{ntfy_topic}'
def v2_playbook_on_start(self, playbook: Playbook):
self.playbook = playbook
def v2_playbook_on_stats(self, stats: AggregateStats):
if not self.playbook:
return
if not stats.failures and not stats.dark:
return
assert self.playbook._file_name
playbook = Path(self.playbook._file_name)
results = {}
hosts = set(stats.failures.keys()).union(stats.dark.keys())
title = f'Playbook {playbook.name} failed for {len(hosts)} hosts'
for host in hosts:
results[host] = {
'ok': stats.ok.get(host, 0),
'changed': stats.changed.get(host, 0),
'unreachable': stats.dark.get(host, 0),
'failed': stats.failures.get(host, 0),
'skipped': stats.skipped.get(host, 0),
'rescued': stats.rescued.get(host, 0),
'ignored': stats.ignored.get(host, 0),
}
lines = []
for host, result in results.items():
result_txt = ' '.join(f'{k}={v}' for k, v in result.items())
lines.append(f'{host} : {result_txt}')
message = '\n'.join(lines)
req = urllib.request.Request(
self.ntfy_url,
method='POST',
data=message.encode('utf-8'),
)
req.add_header('Title', title)
req.add_header('Tag', 'red_circle')
with urllib.request.urlopen(req) as response:
status_code = response.getcode()
if status_code < 200 or status_code >= 300:
response_data = response.read()
raise AnsibleError(f'Failed to send notification: {response_data.decode()}')