configpolicy/plugins/inventory/pyrocufflink.py

249 lines
7.6 KiB
Python

import enum
import functools
import logging
from typing import cast, Any, Iterator, Optional
from xml.etree import ElementTree as etree
import libvirt
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable
DOCUMENTATION = r'''
name: pyrocufflink
extends_documentation_fragment:
- constructed
short_description: Pyrocufflink inventory source
version_added: ''
description:
- Dynamic inventory for Pyrocufflink machines
author:
- Dustin C. Hatch <dustin@hatch.name>
options:
plugin:
description:
Token that ensures this is a source file for the 'pyrocufflink' plugin.
required: True
choices: ['pyrocufflink']
uri:
description: libvirt connection URI
type: list
required: true
read_only:
description: Open a read-only connection to libvirt
type: boolean
default: false
dns_domain:
description: DNS doman to append to single-label VM names
type: string
exclude_off:
description: Exclude libvirt domains that are powered off
type: boolean
default: false
exclude_os:
description: Exclude domains running specified operating systems
type: list
default: []
log_excluded:
description:
Print a log message about excluded domains, useful for troubleshooting
type: boolean
default: false
'''
log = logging.getLogger('ansible.plugins.inventory.pyrocufflink')
# Remove the default error handler, which prints to stderr
libvirt.registerErrorHandler(lambda *_: None, None)
class XMLNS:
dch = 'http://du5t1n.me/xmlns/libvirt/metadata/'
libosinfo = 'http://libosinfo.org/xmlns/libvirt/domain/1.0'
class DomainState(enum.IntEnum):
nostate = 0
running = 1
blockde = 2
paused = 3
shutdown = 4
shutoff = 5
crashed = 6
pmsuspend = 7
class InventoryModule(BaseInventoryPlugin, Constructable):
NAME = 'pyrocufflink'
def parse(self, inventory, loader, path, cache=True):
super().parse(inventory, loader, path, cache=cache)
config_data = self._read_config_data(path)
self._consume_options(config_data)
uri_list = self.get_option('uri')
if not isinstance(uri_list, list):
uri_list = [uri_list]
read_only = cast(bool, self.get_option('read_only'))
dns_domain = self.get_option('dns_domain')
exclude_off = cast(bool, self.get_option('exclude_off'))
exclude_os = cast(list[str], self.get_option('exclude_os'))
log_excluded = cast(bool, self.get_option('log_excluded'))
assert self.inventory
for uri in uri_list:
if read_only:
conn = libvirt.openReadOnly(uri)
else:
conn = libvirt.open(uri)
for dom in conn.listAllDomains():
host = Host(dom)
state = host.get_state()[0]
if state == DomainState.shutoff and exclude_off:
if log_excluded:
log.warning(
'Excluding libvirt domain %s in state %r',
host.name,
DomainState.shutoff.name,
)
continue
if host.os_id in exclude_os:
if log_excluded:
log.warning(
'Excluding libvirt domain %s with OS %r',
host.name,
host.os_id,
)
continue
if host.title:
inventory_hostname = host.title
else:
inventory_hostname = host.name
if dns_domain and '.' not in inventory_hostname:
inventory_hostname = f'{inventory_hostname}.{dns_domain}'
self.inventory.add_host(inventory_hostname)
for group in host.groups:
self.inventory.add_group(group)
self.inventory.add_host(inventory_hostname, group)
self.inventory.set_variable(
inventory_hostname, 'libvirt_uri', uri
)
self.inventory.set_variable(
inventory_hostname, 'libvirt', host.variables(read_only)
)
class Host:
def __init__(self, domain: libvirt.virDomain) -> None:
self.domain = domain
@functools.cached_property
def description(self) -> Optional[str]:
try:
return self.domain.metadata(0, None)
except libvirt.libvirtError as e:
log.debug(
'Could not get description for domain %s: %s', self.name, e
)
return None
@functools.cached_property
def groups(self) -> list[str]:
return list(self._groups())
@functools.cached_property
def libosinfo(self) -> Optional[etree.Element]:
try:
metadata = self.domain.metadata(2, XMLNS.libosinfo)
except libvirt.libvirtError as e:
log.debug(
'Could not get extended domain metadata for %s: %s',
self.name,
e,
)
return None
return etree.fromstring(metadata)
@functools.cached_property
def metadata(self) -> Optional[etree.Element]:
try:
metadata = self.domain.metadata(2, XMLNS.dch)
except libvirt.libvirtError as e:
log.debug(
'Could not get extended domain metadata for %s: %s',
self.name,
e,
)
return None
return etree.fromstring(metadata)
@functools.cached_property
def name(self) -> str:
return self.domain.name()
@functools.cached_property
def os_id(self) -> Optional[str]:
if self.libosinfo is None:
return
if (os := self.libosinfo.find('os')) is not None:
return os.get('id')
@functools.cached_property
def title(self) -> Optional[str]:
try:
return self.domain.metadata(1, None)
except libvirt.libvirtError as e:
log.debug('Could not get title for domain %s: %s', self.name, e)
return None
def get_guest_info(self) -> Optional[dict[str, str]]:
try:
return self.domain.guestInfo()
except libvirt.libvirtError as e:
log.error('Could not get guest info for %s: %s', self.name, e)
def get_state(self) -> tuple[int, int]:
state, reason = self.domain.state()
return state, reason
def variables(self, read_only: bool = False) -> dict[str, Any]:
values = {}
if title := self.title:
values['title'] = title
if description := self.description:
values['description'] = description
if self.os_id:
values['os_id'] = self.os_id
state_code, reason_code = self.get_state()
values['state_code'] = state_code
values['reason_code'] = reason_code
try:
state = DomainState(state_code)
except KeyError:
log.warning(
'Unknown state for domain %s: %s:', self.name, state_code
)
state = None
else:
values['state'] = state.name
if not read_only and state is DomainState.running:
if guest_info := self.get_guest_info():
values['guest_info'] = guest_info
return values
def _groups(self) -> Iterator[str]:
if self.metadata is None:
return
if groups := self.metadata.find('groups'):
for elem in groups.iter('group'):
if group_name := elem.get('name'):
yield group_name