configpolicy/roles/step-ssh/library/ssh_host_certs.py

137 lines
4.1 KiB
Python

import email.mime.application
import email.mime.multipart
import email.mime.nonmultipart
import email.parser
import email.policy
import json
import logging
import os
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any, Iterable, Optional, Union
from ansible.module_utils.basic import AnsibleModule
log = logging.getLogger('ssh_cert_sign')
Json = dict[str, Any]
FormField = tuple[str, Union[str, Path]]
class SignFailed(Exception):
...
def ansible_main():
module_args = {
'url': {
'type': 'str',
'required': False,
'default': 'https://webhooks.pyrocufflink.blue/sshkeys/sign',
},
'hostname': {
'type': 'str',
'required': False,
'default': os.uname().nodename,
},
'ssh_dir': {
'type': 'str',
'required': False,
'default': '/etc/ssh',
},
}
module = AnsibleModule(argument_spec=module_args, supports_check_mode=True)
url = module.params['url']
hostname = module.params['hostname']
ssh_dir = Path(module.params['ssh_dir'])
certs = list_certs(ssh_dir)
changed = not certs
if changed and not module.check_mode:
try:
certs = sign_and_save_certs(url, hostname, ssh_dir)
except Exception as e:
module.fail_json(msg=str(e))
module.exit_json(changed=changed, certs=[str(p) for p in certs])
def list_certs(sshdir: Path) -> list[Path]:
return list(sshdir.glob('ssh_host_*_key-cert.pub'))
def multipart_data(fields: Iterable[FormField]) -> tuple[str, bytes]:
m = email.mime.multipart.MIMEMultipart('form-data')
for name, value in fields:
if isinstance(value, Path):
with value.open('rb') as f:
part = email.mime.application.MIMEApplication(f.read())
part.add_header('Content-Disposition', 'form-data')
part.set_param(
'filename', value.name, header='Content-Disposition'
)
else:
part = email.mime.nonmultipart.MIMENonMultipart('text', 'plain')
part.add_header('Content-Disposition', 'form-data')
part.set_payload(value.encode('utf-8'))
part.set_param('name', name, header='Content-Disposition')
del part['MIME-Version']
m.attach(part)
data = m.as_bytes(policy=email.policy.HTTP)
headers, __, content = data.partition(b'\r\n\r\n')
parser = email.parser.BytesHeaderParser()
header_map = parser.parsebytes(headers)
content_type = header_map['Content-Type']
return (content_type, content)
def request_sign(
url: str, hostname: str, sshdir: Optional[Path] = None
) -> Json:
if sshdir is None:
sshdir = Path('/etc/ssh')
fields: list[FormField] = [('hostname', hostname)]
for path in sshdir.glob('ssh_host_*_key.pub'):
fields.append(('keys', path))
content_type, data = multipart_data(fields)
req = urllib.request.Request(url, data=data)
req.add_header('Content-Type', content_type)
try:
res = urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
with e:
buf = e.read()
try:
r = json.loads(buf)
except ValueError as ve:
log.error(f'Error decoding response as JSON: {ve}')
else:
errors = r.get('errors')
if errors:
raise SignFailed('; '.join(errors)) from e
msg = buf.strip().decode('utf-8', errors='replace') or e.reason
raise SignFailed(msg) from e
else:
with res:
return json.load(res)
def sign_and_save_certs(url: str, hostname: str, sshdir: Path) -> list[Path]:
certs = []
response = request_sign(url, hostname, sshdir)
for name, content in response['certificates'].items():
path = sshdir / name
log.info('Writing new certificate to %s', path)
with path.open('w', encoding='utf-8') as f:
f.write(content)
certs.append(path)
return certs
if __name__ == '__main__':
ansible_main()