import os import requests import sys import traceback import urllib.parse import webbrowser class ZabbixError(Exception): def __init__(self, *, code=None, data=None, message=None): self.code = code self.data = data self.message = message def __str__(self): return '[{}] {}'.format(self.code, self.message) class Zabbix: METHOD_PATHS = { 'zabbix.status': 'jsrpc.php?output=json-rpc', } DEFAULT_PATH = 'api_jsonrpc.php' def __init__(self, url): self.baseurl = url.rstrip('/') + '/' self.auth = None def get_triggers(self): return self._call('trigger.get', monitored=True, min_severity=3, filter={ 'value': 1, }, ) def zabbix_status(self): return self._call('zabbix.status') def login(self, user='guest', password=''): self.auth = self._call('user.login', user=user, password=password) def logout(self): return self._call('user.logout') def _call(self, method, **kwargs): url = urllib.parse.urljoin( self.baseurl, self.METHOD_PATHS.get(method, self.DEFAULT_PATH), ) data = { 'jsonrpc': '2.0', 'method': method, 'params': kwargs, 'id': 1, } if self.auth is not None: data['auth'] = self.auth r = requests.post(url, json=data, headers={ 'Content-Type': 'application/json', 'Accept': 'application/json', }) r.raise_for_status() response = r.json() if 'error' in response: raise ZabbixError(**response['error']) else: return response['result'] class Py3status: def __init__(self): self.user = 'guest' self.password = '' self.format = 'ZBX: {summary}' self.__zabbix = None @property def _zabbix(self): if not self.__zabbix: self.__zabbix = Zabbix(self.url) if not self.__zabbix.auth: self.__zabbix.login(self.user, self.password) return self.__zabbix def kill(self): if self.__zabbix: self.__zabbix.logout() def zbx_triggers(self): try: if not self._zabbix.zabbix_status()['result']: summary = 'DOWN' color = self.py3.COLOR_BAD elif self._zabbix.get_triggers(): summary = 'PROBLEM' color = self.py3.COLOR_BAD else: summary = 'OK' color = self.py3.COLOR_GOOD except: traceback.print_exc() summary = 'ERR' color = self.py3.COLOR_BAD return { 'full_text': self.py3.safe_format(self.format, { 'summary': summary, }), 'color': color, } def on_click(self, event): if event['button'] != 1: return if os.fork(): return with open(os.devnull, 'r+') as f: os.dup2(f.fileno(), sys.stdout.fileno()) webbrowser.open(self.url) def main(): z = Zabbix('https://zabbix.securepassage.com/zabbix/') #z = Zabbix('http://localhost:8080') z.login() try: print(z.get_triggers()) finally: z.logout() if __name__ == '__main__': main()