From e9da2cdd28d5413864762a36b6e90f6020e5a160 Mon Sep 17 00:00:00 2001 From: "Dustin C. Hatch" Date: Sat, 8 Feb 2025 09:44:25 -0600 Subject: [PATCH] host/online: Set TTL for AMQP messages Setting messages to expire after 10 minutes without being consumed. If they haven't been consumed by then, there must be something wrong with the host provisioner. Since each host provisioner process only processes a single message, placing more messages onto the queue without an expiration will cause a backlog of messages that cannot be processed. --- dch_webhooks.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/dch_webhooks.py b/dch_webhooks.py index 44bf4e1..15bbe1a 100644 --- a/dch_webhooks.py +++ b/dch_webhooks.py @@ -67,6 +67,8 @@ PAPERLESS_URL = os.environ.get( ANSIBLE_JOB_YAML = Path(os.environ.get('ANSIBLE_JOB_YAML', 'ansible-job.yaml')) ANSIBLE_JOB_NAMESPACE = os.environ.get('ANSIBLE_JOB_NAMESPACE', 'ansible') HOST_INFO_QUEUE = os.environ.get('HOST_INFO_QUEUE', 'host-provisioner') +HOST_INFO_TTL = 600000 + KUBERNETES_TOKEN_PATH = Path( os.environ.get( 'KUBERNETES_TOKEN_PATH', @@ -364,12 +366,19 @@ class AMQPContext: ) self._channel = await fut - def publish(self, exchange: str, routing_key: str, body: bytes): + def publish( + self, + exchange: str, + routing_key: str, + body: bytes, + properties: Optional[pika.BasicProperties] = None, + ): assert self._channel self._channel.basic_publish( exchange, routing_key, body, + properties, ) async def queue_declare( @@ -620,10 +629,13 @@ async def publish_host_info( data['branch'] = branch await context.amqp.connect() await context.amqp.queue_declare(HOST_INFO_QUEUE, durable=True) + properties = pika.BasicProperties() + properties.expiration = str(HOST_INFO_TTL) context.amqp.publish( exchange='', routing_key=HOST_INFO_QUEUE, body=json.dumps(data).encode('utf-8'), + properties=properties, )