From 88130dd72f9d4888d4bd6e2c2f2344591dbacd1c Mon Sep 17 00:00:00 2001 From: "Dustin C. Hatch" Date: Mon, 3 Feb 2025 20:22:09 -0600 Subject: [PATCH] host/online: AMQP connection/auth config The `AMQPContext` now supports reading connection and authentication information from environment variables, allowing it to connect to RabbitMQ servers other than the default (`localhost:6372`, user _guest_). It supports plain and TLS connection modes, as well as plain username+password or EXTERNAL authentication. --- dch_webhooks.py | 56 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 5 deletions(-) diff --git a/dch_webhooks.py b/dch_webhooks.py index 8d51ed7..4006440 100644 --- a/dch_webhooks.py +++ b/dch_webhooks.py @@ -7,6 +7,7 @@ import json import logging import os import re +import ssl from pathlib import Path from types import TracebackType from typing import Any, Optional, Self, Type @@ -15,6 +16,7 @@ import fastapi import httpx import pika import pika.channel +import pika.credentials import pydantic import pyrfc6266 import ruamel.yaml @@ -64,6 +66,7 @@ PAPERLESS_URL = os.environ.get( ANSIBLE_JOB_YAML = Path(os.environ.get('ANSIBLE_JOB_YAML', 'ansible-job.yaml')) ANSIBLE_JOB_NAMESPACE = os.environ.get('ANSIBLE_JOB_NAMESPACE', 'ansible') +HOST_INFO_QUEUE = os.environ.get('HOST_INFO_QUEUE', 'host-provisioner') KUBERNETES_TOKEN_PATH = Path( os.environ.get( 'KUBERNETES_TOKEN_PATH', @@ -285,11 +288,54 @@ class AMQPError(Exception): ... class AMQPContext: - def __init__(self) -> None: - self._conn_params = pika.ConnectionParameters() + def __init__( + self, + conn_params: Optional[ + pika.ConnectionParameters | pika.URLParameters + ] = None, + ) -> None: + self._conn_params = conn_params self._connection: Optional[AsyncioConnection] = None self._channel: Optional[pika.channel.Channel] = None + @classmethod + def from_env(cls) -> Self: + if 'AMQP_URL' in os.environ: + params = pika.URLParameters(os.environ['AMQP_URL']) + else: + kwargs = {} + if host := os.environ.get('AMQP_HOST'): + kwargs['host'] = host + if port := os.environ.get('AMQP_PORT'): + kwargs['port'] = int(port) + if vhost := os.environ.get('AMQP_VIRTUAL_HOST'): + kwargs['virtual_host'] = vhost + if username := os.environ.get('AMQP_USERNAME'): + password = os.environ.get('AMQP_PASSWORD', '') + kwargs['credentials'] = pika.PlainCredentials( + username, password + ) + elif os.environ.get('AMQP_EXTERNAL_CREDENTIALS'): + kwargs['credentials'] = pika.credentials.ExternalCredentials() + if ( + 'AMQP_CA_CERT' in os.environ + or 'AMQP_CLIENT_CERT' in os.environ + or 'AMQP_CLIENT_KEY' in os.environ + ): + sslctx = ssl.create_default_context( + cafile=os.environ.get('AMQP_CA_CERT') + ) + if certfile := os.environ.get('AMQP_CLIENT_CERT'): + keyfile = os.environ.get('AMQP_CLIENT_KEY') + keypassword = os.environ.get('AMQP_CLIENT_KEY_PASSWORD') + sslctx.load_cert_chain(certfile, keyfile, keypassword) + kwargs['ssl_options'] = pika.SSLOptions( + sslctx, kwargs.get('host') + ) + + params = pika.ConnectionParameters(**kwargs) + return cls(params) + def close(self): if self._channel: self._channel.close() @@ -420,7 +466,7 @@ class Kubernetes(HttpxClientMixin): class Context: def __init__(self): - self.amqp = AMQPContext() + self.amqp = AMQPContext.from_env() async def handle_firefly_transaction(xact: FireflyIIITransaction) -> None: @@ -565,10 +611,10 @@ async def start_ansible_job(): async def publish_host_info(hostname: str, sshkeys: str): await context.amqp.connect() - await context.amqp.queue_declare('host-provision', durable=True) + await context.amqp.queue_declare(HOST_INFO_QUEUE, durable=True) context.amqp.publish( exchange='', - routing_key='host-provision', + routing_key=HOST_INFO_QUEUE, body=json.dumps( { 'hostname': hostname,