#!/usr/bin/env python3 import base64 import dataclasses import logging import os import tempfile from typing import Optional from pathlib import Path import git import kubernetes.client import kubernetes.config import ruamel.yaml log = logging.getLogger("cert-exporter") DEBUG = os.environ.get("CERT_EXPORTER_DEBUG") == "1" CONFIG_FILE = os.environ.get( "CERT_EXPORTER_CONFIG", "/etc/cert-exporter/config.yml", ) @dataclasses.dataclass class CertConfig: name: str namespace: str key: Path cert: Path bundle: Optional[Path] = None @dataclasses.dataclass class Configuration: git_repo: str certs: list[CertConfig] = dataclasses.field(default_factory=list) @classmethod def load(cls, path: Optional[Path] = None) -> "Configuration": if path is None: path = Path(CONFIG_FILE) with path.open("r", encoding="utf-8") as f: values = ruamel.yaml.safe_load(f) config = Configuration( git_repo=values["git_repo"], ) for cert in values.get("certs", ()): config.certs.append( CertConfig( name=cert["name"], namespace=cert["namespace"], key=Path(cert["key"]), cert=Path(cert["cert"]), bundle=Path(cert.get("bundle")), ) ) return config def update_cert(cert: CertConfig, api: kubernetes.client.ApiClient) -> None: core = kubernetes.client.CoreV1Api(api) log.info( "Fetching certificate from Secret %s in namespace %s", cert.name, cert.namespace, ) try: secret = core.read_namespaced_secret(cert.name, cert.namespace) except kubernetes.client.ApiException as e: log.error( "Could not get certificate from Secret %s in namespace %s: %s", cert.name, cert.namespace, e, ) return key = base64.b64decode(secret.data["tls.key"]) crt = base64.b64decode(secret.data["tls.crt"]) if not cert.key.parent.exists(): cert.key.parent.mkdir(parents=True) with cert.key.open("wb") as f: log.info("Writing certificate private key to %s", f.name) f.write(key) if not cert.cert.parent.exists(): cert.cert.parent.mkdir(parents=True) with cert.cert.open("wb") as f: log.info("Writing certificate to %s", f.name) f.write(crt) if cert.bundle is not None: if not cert.bundle.parent.exists(): cert.bundle.parent.mkdir(parents=True) with cert.bundle.open("wb") as f: log.info("Writing certificate bundle to %s", f.name) f.write(key) f.write(crt) def main(): logging.basicConfig( level=logging.DEBUG if DEBUG else logging.INFO, ) logging.getLogger("kubernetes.client.rest").setLevel(logging.INFO) config = Configuration.load() kubernetes.config.load_config() with tempfile.TemporaryDirectory() as d: os.chdir(d) log.debug("Cloning Git repo %s to %s", config.git_repo, d) repo = git.Repo.clone_from(config.git_repo, d) with kubernetes.client.ApiClient() as k: log.debug("Using Kubernetes API server %s", k.configuration.host) for cert in config.certs: try: update_cert(cert, k) except Exception as e: log.error( "Failed to update certificate %s: %s", cert.name, e ) continue if repo.is_dirty(): log.info("Committing updated certificates") for diff in repo.index.diff(None): repo.index.add(diff.b_path) repo.index.commit("Update certificates") log.info("Pushing new refs to origin remote") repo.remotes.origin.push().raise_if_error() log.info("Successfully updated certificates") else: log.info("No certificates to update") if __name__ == "__main__": main()