configpolicy/filter_plugins/age.py

38 lines
920 B
Python

import os
import subprocess
from typing import Callable, Optional
from ansible.errors import AnsibleError
class AgeError(AnsibleError):
pass
class FilterModule:
def filters(self) -> dict[str, Callable[..., str]]:
return {
'decrypt': age_filter,
}
def age_filter(data: str, key: Optional[str] = None) -> str:
if key is None:
key = 'age.key'
key = os.path.expanduser(key)
p = subprocess.Popen(
['age', '-d', '-i', key],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p.communicate(data.encode('utf-8'))
if p.returncode != 0:
error = ' '.join(
l
for l in stderr.decode('utf-8', errors='replace').splitlines()
if not l.startswith('age: report unexpected')
)
raise AgeError(error)
return stdout.decode('utf-8')