From d19481b0630e8764c15901296321f9be644990e0 Mon Sep 17 00:00:00 2001 From: Updatebot Date: Sun, 25 Aug 2024 08:48:31 -0500 Subject: [PATCH] Initial commit --- .gitignore | 5 + pyproject.toml | 37 ++++++ updatebot.py | 328 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 370 insertions(+) create mode 100644 .gitignore create mode 100644 pyproject.toml create mode 100644 updatebot.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..885f5a4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/.venv +/config.toml +*.egg-info/ +__pycache__/ +*.py[co] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..0ba39cd --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,37 @@ +[project] +name = "updatebot" +authors = [ + {name = "Dustin C. Hatch", email = "dustin@hatch.name"}, +] +description = "Open PRs to update applications deployed in Kubernetes" +requires-python = ">=3.12" +license = {text = "0BSD"} +classifiers = [ + "License :: OSI Approved :: Zero-Clause BSD (0BSD)", + "Programming Language :: Python :: 3", +] +dependencies = [ + "GitPython~=3.1.43", + "colorlog~=6.8.2", + "pydantic~=2.8.2", + "requests~=2.32.3", + "ruamel.yaml~=0.18.6", +] +dynamic = ["version"] + +[project.scripts] +updatebot = "updatebot:main" + +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[tool.setuptools_scm] + +[tool.pyright] +venvPath = '.' +venv = '.venv' + +[tool.black] +line-length = 79 +skip-string-normalization = true diff --git a/updatebot.py b/updatebot.py new file mode 100644 index 0000000..62e94b5 --- /dev/null +++ b/updatebot.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python +import abc +import argparse +import logging +import functools +import os +import re +import tempfile +import threading +import tomllib +import urllib.parse +from pathlib import Path +from typing import ClassVar, Literal, Union, Optional + +import colorlog +import pydantic +import git +import requests +import ruamel.yaml + + +log = logging.getLogger('updatebot') + +TRACE = 5 +logging.addLevelName(TRACE, 'TRACE') + + +XDG_CONFIG_HOME = ( + Path(os.environ['XDG_CONFIG_HOME']) + if 'XDG_CONFIG_HOME' in os.environ + else Path('~/.config').expanduser() +) + + +tls = threading.local() + + +def _get_session() -> requests.Session: + if hasattr(tls, 'session'): + session = tls.session + else: + log.debug('Starting new HTTP/HTTPS client session') + session = tls.session = requests.Session() + return session + + +class BaseSource(abc.ABC, pydantic.BaseModel): + @abc.abstractmethod + def get_latest_version(self) -> str: + raise NotImplementedError + + +class GithubSource(BaseSource): + kind: Literal['github'] + organization: str + repo: str + version_re: str = r'(?P[0-9]+(\.[0-9]+)*(-[0-9]+)?)' + + def get_latest_version(self) -> str: + session = _get_session() + url = ( + f'https://api.github.com/repos/' + f'{self.organization}/{self.repo}/releases/latest' + ) + r = session.get(url) + release = r.json() + m = re.search(self.version_re, release['name']) + if not m: + log.warning( + 'Release name "%s" did not match regular expression "%s"', + release['name'], + self.version_re, + ) + return release['name'] + return m.groupdict()['version'] + + +class DockerHubSource(BaseSource): + kind: Literal['docker'] + namespace: str + repository: str + + def get_latest_version(self) -> str: + session = _get_session() + url = ( + f'https://hub.docker.com/v2/' + f'namespaces/{self.namespace}/repositories/{self.repository}/tags' + ) + r = session.get(url) + data = r.json() + versions = [] + for result in data['results']: + if result['name'] == 'latest': + continue + versions.append((result['last_updated'], result['name'])) + versions.sort() + return versions[-1][-1] + + +Source = Union[ + GithubSource, + DockerHubSource, +] + + +class BaseProject(abc.ABC, pydantic.BaseModel): + path: Optional[Path] = None + source: Source + image: str + tag_format: str = '{version}' + + @abc.abstractmethod + def apply_update(self, path: Path, version: str) -> None: + raise NotImplementedError + + +class KustomizeProject(BaseProject): + kind: Literal['kustomize'] + + kustomize_files: ClassVar[list[str]] = [ + 'kustomization.yaml', + 'kustomization.yml', + 'Kustomization', + ] + + def apply_update(self, path: Path, version: str) -> None: + for filename in self.kustomize_files: + filepath = path / filename + if filepath.is_file(): + break + else: + filepath = path / self.kustomize_files[0] + yaml = ruamel.yaml.YAML() + with filepath.open('rb') as f: + kustomization = yaml.load(f) + images = kustomization.setdefault('images', []) + new_tag = self.tag_format.format(version=version) + for image in images: + if image['image'] == self.image: + image['newTag'] = new_tag + break + else: + images.append({'image': self.image, 'newTag': new_tag}) + with filepath.open('wb') as f: + yaml.dump(kustomization, f) + + +class DirectoryProject(BaseProject): + kind: Literal['dir'] | Literal['directory'] + + +Project = Union[ + KustomizeProject, + DirectoryProject, +] + + +class RepoConfig(pydantic.BaseModel): + url: str + token_file: Path + + @functools.cached_property + def repo_api_url(self) -> str: + urlparts = urllib.parse.urlsplit(self.url) + urlparts = urlparts._replace(path=f'/api/v1/repos{urlparts.path}') + return urllib.parse.urlunsplit(urlparts) + + @functools.cached_property + def auth_token(self) -> str: + return self.token_file.read_text().strip() + + def get_git_url(self) -> str: + session = _get_session() + r = session.get( + self.repo_api_url, + headers={ + 'Authorization': f'token {self.auth_token}', + }, + ) + data = r.json() + if ssh_url := data.get('ssh_url'): + return ssh_url + return data['clone_url'] + + def create_pr( + self, title: str, source_branch: str, target_branch: str + ) -> None: + session = _get_session() + r = session.post( + f'{self.repo_api_url}/pulls', + headers={ + 'Authorization': f'token {self.auth_token}', + }, + json={ + 'title': title, + 'base': target_branch, + 'head': source_branch, + }, + ) + log.log(TRACE, '%r', r.content) + if 300 > r.status_code >= 200: + data = r.json() + log.info('Created pull request: %s', data['url']) + elif r.status_code == 409: + data = r.json() + log.warning('%s', data['message']) + elif r.status_code < 500: + data = r.json() + log.error('Failed to create PR: %s', data['message']) + else: + log.error('Failed to create PR: %r', r.content) + + +class Config(pydantic.BaseModel): + repo: RepoConfig + projects: dict[str, Project] + + +class Arguments: + config: Path + branch_name: str + projects: list[str] + + +def update_project(repo: git.Repo, name: str, project: Project) -> git.Commit: + basedir = Path(repo.working_dir) + log.debug('Checking for latest version of %s', name) + latest = project.source.get_latest_version() + log.info('Found version %s for %s', latest, name) + log.debug('Applying update for %s version %s', name, latest) + path = basedir / (project.path or name) + log.debug('Committing changes to %s', path) + project.apply_update(path, latest) + repo.index.add(str(path)) + c = repo.index.commit(f'{name}: Update to {latest}') + log.info('Commited %s %s', str(c)[:7], c.summary) + return c + + +def parse_args() -> Arguments: + parser = argparse.ArgumentParser() + parser.add_argument( + '--config', + '-c', + type=Path, + default=XDG_CONFIG_HOME / 'updatebot' / 'config.toml', + ) + parser.add_argument('--branch-name', '-b', default='updatebot') + parser.add_argument('projects', metavar='project', nargs='*', default=[]) + return parser.parse_args(namespace=Arguments()) + + +def setup_logging() -> None: + handler = colorlog.StreamHandler() + handler.setFormatter( + colorlog.ColoredFormatter( + '%(log_color)s%(levelname)8s%(reset)s ' + '%(bold_white)s%(name)s%(reset)s ' + '%(message)s', + log_colors={ + 'TRACE': 'purple', + 'DEBUG': 'blue', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'bold_red', + }, + ) + ) + handler.setLevel(os.environ.get('LOG_LEVEL', 'DEBUG')) + logging.root.addHandler(handler) + logging.root.setLevel(logging.DEBUG) + log.setLevel(TRACE) + + +def main() -> None: + setup_logging() + args = parse_args() + with args.config.open('rb') as f: + data = tomllib.load(f) + config = Config.model_validate(data) + log.debug('Using configuration: %s', config) + all_projects = list(config.projects.keys()) + projects = [] + if args.projects: + for project in args.projects: + if project not in all_projects: + log.warning('Unknown project: %s', project) + continue + projects.append(project) + else: + projects = all_projects + if not projects: + log.error('No projects') + raise SystemExit(1) + if log.isEnabledFor(logging.INFO): + log.info('Updating projects: %s', ', '.join(projects)) + with tempfile.TemporaryDirectory(prefix='updatebot.') as d: + log.debug('Using temporary working directory: %s', d) + d = Path(d) + log.debug('Retreiving repository Git URL') + repo_url = config.repo.get_git_url() + repo = git.Repo.clone_from(repo_url, d, depth=1) + log.debug('Checking out new branch: %s', args.branch_name) + repo.heads[0].checkout(force=True, B=args.branch_name) + title = None + for project in projects: + commit = update_project(repo, project, config.projects[project]) + if not title: + if not isinstance(commit.summary, str): + title = bytes(commit.summary).decode( + 'utf-8', errors='replace' + ) + else: + title = commit.summary + if not title: + return + repo.head.reference.set_tracking_branch( + git.RemoteReference( + repo, f'refs/remotes/origin/{args.branch_name}' + ) + ) + repo.remote().push(force=True) + config.repo.create_pr(title, args.branch_name, 'master') + + +if __name__ == '__main__': + main()