Compare commits
11 Commits
72eae4d5b3
...
bdcb8c93b6
Author | SHA1 | Date |
---|---|---|
|
bdcb8c93b6 | |
|
3ff18d1042 | |
|
0f9b3a5ac5 | |
|
e4742f1c6e | |
|
76cb7c7958 | |
|
bef7206642 | |
|
28fe49c2b2 | |
|
9f113d6a3f | |
|
8de0d93eb1 | |
|
43aba0c848 | |
|
b30b38f76f |
|
@ -1,5 +1,8 @@
|
|||
*
|
||||
!.git
|
||||
!chase2fa.py
|
||||
!entrypoint.sh
|
||||
!pinentry-stub.sh
|
||||
!pyproject.toml
|
||||
!secretsocket.py
|
||||
!xactfetch.py
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
/.venv
|
||||
/cookies.json
|
||||
/screenshot_*.png
|
||||
/secrets.toml
|
||||
*.egg-info/
|
||||
__pycache__/
|
||||
*.py[co]
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
FROM registry.fedoraproject.org/fedora-minimal:38 AS build
|
||||
FROM git.pyrocufflink.net/containerimages/dch-base AS build
|
||||
|
||||
RUN --mount=type=cache,target=/var/cache \
|
||||
microdnf install -y \
|
||||
|
@ -18,11 +18,22 @@ RUN --mount=type=cache,target=/var/cache \
|
|||
python3-wheel \
|
||||
&& :
|
||||
|
||||
COPY . /src
|
||||
COPY .git /src/.git
|
||||
COPY xactfetch.py pyproject.toml /src
|
||||
|
||||
RUN python3 -m pip wheel -w /wheels /src
|
||||
|
||||
FROM registry.fedoraproject.org/fedora-minimal:38
|
||||
|
||||
FROM scratch AS mixin
|
||||
|
||||
COPY pinentry-stub.sh /usr/local/bin/pinentry-stub
|
||||
COPY secretsocket.py /usr/local/bin/secretsocket
|
||||
COPY chase2fa.py /usr/local/bin/chase2fa
|
||||
|
||||
COPY entrypoint.sh /entrypoint.sh
|
||||
|
||||
|
||||
FROM git.pyrocufflink.net/containerimages/dch-base
|
||||
|
||||
RUN --mount=type=cache,target=/var/cache \
|
||||
microdnf install -y \
|
||||
|
@ -47,11 +58,16 @@ RUN --mount=type=cache,target=/var/cache \
|
|||
libXrandr \
|
||||
libXrender \
|
||||
libXtst \
|
||||
libdrm \
|
||||
libxcb \
|
||||
mesa-libgbm \
|
||||
nspr \
|
||||
nss \
|
||||
pango \
|
||||
python3 \
|
||||
python3-pip \
|
||||
tini \
|
||||
xorg-x11-server-Xvfb \
|
||||
&& echo xactfetch:x:2468: >> /etc/group \
|
||||
&& echo xactfetch:*:2468:2468:xactfetch:/var/lib/xactfetch:/sbin/nologin >> /etc/passwd \
|
||||
&& :
|
||||
|
@ -61,16 +77,15 @@ ENV PLAYWRIGHT_BROWSERS_PATH=/usr/local/playwright/browsers
|
|||
RUN --mount=type=bind,from=build,source=/,target=/build \
|
||||
python3 -m pip install --no-index -f /build/wheels xactfetch \
|
||||
&& cp /build/root/.cargo/bin/rbw* /usr/local/bin/ \
|
||||
&& install /build/src/pinentry-stub.sh /usr/local/bin/pinentry-stub \
|
||||
&& playwright install firefox \
|
||||
&& playwright install chromium \
|
||||
&& :
|
||||
|
||||
COPY --from=mixin / /
|
||||
|
||||
VOLUME /var/lib/xactfetch
|
||||
|
||||
WORKDIR /var/lib/xactfetch
|
||||
|
||||
USER 2468:2468
|
||||
|
||||
ENV XDG_CONFIG_HOME=/etc
|
||||
|
||||
ENTRYPOINT ["tini", "xactfetch", "--"]
|
||||
ENTRYPOINT ["/entrypoint.sh"]
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
#!/usr/bin/env python3
|
||||
import re
|
||||
|
||||
import httpx
|
||||
|
||||
stream = httpx.stream(
|
||||
'GET',
|
||||
'https://ntfy.pyrocufflink.blue/chase2fa/raw',
|
||||
timeout=httpx.Timeout(5, read=None),
|
||||
)
|
||||
with stream as r:
|
||||
for line in r.iter_lines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
m = re.search(r'\d{4,}', line)
|
||||
if m:
|
||||
print(m.group(0))
|
||||
break
|
|
@ -0,0 +1,19 @@
|
|||
#!/bin/sh
|
||||
|
||||
if [ $$ -eq 1 ]; then
|
||||
exec tini "$0" -- "$@"
|
||||
fi
|
||||
|
||||
if [ -z "${SECRET_SOCKET_PATH}" ] || [ ! -e "${SECRET_SOCKET_PATH}" ]; then
|
||||
export SECRET_SOCKET_PATH="${SECRET_SOCKET_PATH:-/tmp/.secretsocket}"
|
||||
secretsocket &
|
||||
sspid=$!
|
||||
fi
|
||||
|
||||
xvfb-run -e /dev/stderr -s '-screen 0 1920x1080x24 -nolisten unix' xactfetch "$@"
|
||||
rc=$?
|
||||
|
||||
if [ -n "${sspid}" ]; then
|
||||
kill $sspid
|
||||
fi
|
||||
exit $rc
|
|
@ -11,6 +11,7 @@ classifiers = [
|
|||
"Programming Language :: Python :: 3",
|
||||
]
|
||||
dependencies = [
|
||||
"httpx~=0.27.0",
|
||||
"playwright~=1.32",
|
||||
"requests~=2.29.0",
|
||||
]
|
||||
|
@ -28,3 +29,7 @@ build-backend = "setuptools.build_meta"
|
|||
[tool.pyright]
|
||||
venvPath = '.'
|
||||
venv = '.venv'
|
||||
|
||||
[tool.black]
|
||||
line-length = 79
|
||||
skip-string-normalization = true
|
||||
|
|
|
@ -0,0 +1,204 @@
|
|||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import signal
|
||||
import socket
|
||||
import struct
|
||||
import tomllib
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
log = logging.getLogger('secretsocket')
|
||||
|
||||
|
||||
ALLOW_UNKNOWN_PEER = os.environ.get('ALLOW_UNKNOWN_PEER') == '1'
|
||||
SECRET_SOCKET_PATH = os.environ.get('SECRET_SOCKET_PATH')
|
||||
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR')
|
||||
|
||||
|
||||
class Secret:
|
||||
async def lookup(self) -> Optional[bytes]:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class EnvSecret(Secret):
|
||||
def __init__(self, env_var: str) -> None:
|
||||
self.env_var = env_var
|
||||
|
||||
async def lookup(self) -> Optional[bytes]:
|
||||
return os.environb.get(self.env_var.encode('utf-8'))
|
||||
|
||||
|
||||
class ExecSecret(Secret):
|
||||
def __init__(self, cmd: str) -> None:
|
||||
self.cmd = cmd
|
||||
|
||||
async def lookup(self) -> Optional[bytes]:
|
||||
args = shlex.split(self.cmd)
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*args,
|
||||
stdin=asyncio.subprocess.DEVNULL,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
)
|
||||
o = await proc.communicate()
|
||||
return o[0]
|
||||
|
||||
|
||||
class PathSecret(Secret):
|
||||
def __init__(self, path: Path) -> None:
|
||||
self.path = path
|
||||
|
||||
async def lookup(self) -> Optional[bytes]:
|
||||
try:
|
||||
f = self.path.expanduser().open('rb')
|
||||
except OSError as e:
|
||||
log.error('Failed to read secret from %s: %s', self.path, e)
|
||||
return None
|
||||
with f:
|
||||
return await asyncio.to_thread(f.read)
|
||||
|
||||
|
||||
class StringSecret(Secret):
|
||||
def __init__(self, value: str) -> None:
|
||||
self.value = value
|
||||
|
||||
async def lookup(self) -> Optional[bytes]:
|
||||
return self.value.encode('utf-8')
|
||||
|
||||
|
||||
class SecretServer:
|
||||
def __init__(self, path: Optional[Path] = None) -> None:
|
||||
self.path = path
|
||||
|
||||
async def handle_client(
|
||||
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||
):
|
||||
try:
|
||||
sock = writer.get_extra_info('socket')
|
||||
pid, uid, gid = get_socket_peercred(sock)
|
||||
except Exception as e:
|
||||
log.error('Failed to get peer credentials: %s', e)
|
||||
pid, uid, gid = None, None, None
|
||||
log.debug('Client connected (pid %d, uid %d, gid %d)', pid, uid, gid)
|
||||
|
||||
if uid is None:
|
||||
if ALLOW_UNKNOWN_PEER:
|
||||
log.warning('Handling connection from unknown peer')
|
||||
else:
|
||||
log.error('Refusing to handle connection from unknown peer')
|
||||
writer.close()
|
||||
return
|
||||
else:
|
||||
my_uid = os.getresuid()[1]
|
||||
if uid != my_uid:
|
||||
log.error(
|
||||
'Refusing to handle connection from PID %d: '
|
||||
'peer UID %d does not match %d',
|
||||
pid,
|
||||
uid,
|
||||
my_uid,
|
||||
)
|
||||
writer.close()
|
||||
return
|
||||
|
||||
while 1:
|
||||
try:
|
||||
key = (await reader.readuntil(b'\n')).rstrip(b'\n').decode()
|
||||
except asyncio.IncompleteReadError:
|
||||
break
|
||||
else:
|
||||
log.info('Client %d requested secret %s', pid, key)
|
||||
try:
|
||||
secret = await self.get_secret(key)
|
||||
except Exception:
|
||||
log.exception('Failed to get secret:')
|
||||
writer.close()
|
||||
return
|
||||
else:
|
||||
writer.write(secret + b'\n')
|
||||
await writer.drain()
|
||||
log.debug('Client disconnected')
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
async def get_secret(self, key: str) -> bytes:
|
||||
secrets = await load_secrets(self.path)
|
||||
if secret := secrets.get(key):
|
||||
if value := await secret.lookup():
|
||||
return value
|
||||
else:
|
||||
log.warning('Lookup of secret %s failed', key)
|
||||
else:
|
||||
log.warning('Unknown secret: %s', key)
|
||||
return b''
|
||||
|
||||
|
||||
def get_socket_peercred(sock: socket.socket) -> tuple[int, int, int]:
|
||||
struct_ucred = '=iii'
|
||||
buflen = struct.calcsize(struct_ucred)
|
||||
cred = sock.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, buflen)
|
||||
return struct.unpack(struct_ucred, cred)
|
||||
|
||||
|
||||
async def load_secrets(path: Optional[Path] = None) -> dict[str, Secret]:
|
||||
if path is None:
|
||||
path = Path('secrets.toml')
|
||||
secrets = {}
|
||||
try:
|
||||
f = path.open('rb')
|
||||
except OSError as e:
|
||||
log.error('Failed to load secrets: %s', e)
|
||||
return secrets
|
||||
with f:
|
||||
config = await asyncio.to_thread(tomllib.load, f)
|
||||
for key, value in config.items():
|
||||
if 'env' in value:
|
||||
secrets[key] = EnvSecret(value['env'])
|
||||
elif 'exec' in value:
|
||||
secrets[key] = ExecSecret(value['exec'])
|
||||
elif 'path' in value:
|
||||
secrets[key] = PathSecret(Path(value['path']))
|
||||
elif 'string' in value:
|
||||
secrets[key] = StringSecret(value['string'])
|
||||
else:
|
||||
log.warning(
|
||||
'Unsupported configuration for secret %s: %r', key, value
|
||||
)
|
||||
return secrets
|
||||
|
||||
|
||||
def shutdown(signum, server):
|
||||
log.info('Received signal %d, shutting down', signum)
|
||||
server.close()
|
||||
|
||||
|
||||
async def main():
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
if SECRET_SOCKET_PATH:
|
||||
sock_path = Path(SECRET_SOCKET_PATH)
|
||||
elif XDG_RUNTIME_DIR:
|
||||
sock_path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss'
|
||||
else:
|
||||
sock_path = Path('/tmp/.secretsocket')
|
||||
|
||||
if not sock_path.parent.exists():
|
||||
sock_path.parent.mkdir()
|
||||
|
||||
if sock_path.exists():
|
||||
sock_path.unlink()
|
||||
|
||||
ss = SecretServer()
|
||||
server = await asyncio.start_unix_server(ss.handle_client, path=sock_path)
|
||||
async with server:
|
||||
await server.start_serving()
|
||||
loop = asyncio.get_running_loop()
|
||||
for signum in (signal.SIGINT, signal.SIGTERM):
|
||||
loop.add_signal_handler(signum, shutdown, signum, server)
|
||||
await server.wait_closed()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
521
xactfetch.py
521
xactfetch.py
|
@ -1,30 +1,35 @@
|
|||
import asyncio
|
||||
import base64
|
||||
import copy
|
||||
import datetime
|
||||
import getpass
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
import subprocess
|
||||
import urllib.parse
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import Any, Optional, Type
|
||||
|
||||
import requests
|
||||
from playwright.sync_api import Page
|
||||
from playwright.sync_api import sync_playwright
|
||||
import httpx
|
||||
from playwright.async_api import Playwright, Page
|
||||
from playwright.async_api import async_playwright
|
||||
|
||||
|
||||
log = logging.getLogger('xactfetch')
|
||||
|
||||
|
||||
NTFY_URL = os.environ['NTFY_URL']
|
||||
NTFY_TOPIC = os.environ['NTFY_TOPIC']
|
||||
FIREFLY_III_URL = os.environ['FIREFLY_III_URL']
|
||||
FIREFLY_III_IMPORTER_URL = os.environ['FIREFLY_IMPORT_URL']
|
||||
NTFY_URL = os.environ.get('NTFY_URL', 'https://ntfy.pyrocufflink.blue')
|
||||
NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'dustin')
|
||||
FIREFLY_III_URL = os.environ.get(
|
||||
'FIREFLY_III_URL', 'https://firefly.pyrocufflink.blue'
|
||||
)
|
||||
FIREFLY_III_IMPORTER_URL = os.environ.get(
|
||||
'FIREFLY_IMPORT_URL', 'https://firefly-importer.pyrocufflink.blue'
|
||||
)
|
||||
SECRET_SOCKET_PATH = os.environ.get('SECRET_SOCKET_PATH')
|
||||
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR')
|
||||
|
||||
ACCOUNTS = {
|
||||
'commerce': {
|
||||
|
@ -45,8 +50,9 @@ class FireflyImporter:
|
|||
self.url = url
|
||||
self.secret = secret
|
||||
self.auth = auth
|
||||
self.client = httpx.AsyncClient()
|
||||
|
||||
def import_csv(
|
||||
async def import_csv(
|
||||
self,
|
||||
csv: Path,
|
||||
config: dict[str, Any],
|
||||
|
@ -54,24 +60,58 @@ class FireflyImporter:
|
|||
log.debug('Importing transactions from %s to Firefly III', csv)
|
||||
url = f'{self.url.rstrip("/")}/autoupload'
|
||||
with csv.open('rb') as f:
|
||||
r = requests.post(
|
||||
async with httpx.AsyncClient(auth=self.auth) as client:
|
||||
r = await client.post(
|
||||
url,
|
||||
auth=self.auth,
|
||||
headers={
|
||||
'Accept': 'application/json',
|
||||
},
|
||||
params={
|
||||
'secret': self.secret,
|
||||
},
|
||||
headers={
|
||||
'Accept': 'application/json',
|
||||
},
|
||||
files={
|
||||
'importable': ('import.csv', f),
|
||||
'json': ('import.json', json.dumps(config)),
|
||||
},
|
||||
timeout=300,
|
||||
)
|
||||
r.raise_for_status()
|
||||
|
||||
|
||||
def ntfy(
|
||||
class SecretsClient:
|
||||
def __init__(self) -> None:
|
||||
self.sock: tuple[asyncio.StreamReader, asyncio.StreamWriter]
|
||||
|
||||
async def __aenter__(self) -> 'SecretsClient':
|
||||
if not hasattr(self, 'sock'):
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: Optional[Type[Exception]],
|
||||
exc_value: Optional[Exception],
|
||||
tb: Optional[TracebackType],
|
||||
) -> bool:
|
||||
self.sock[1].close()
|
||||
return False
|
||||
|
||||
async def connect(self) -> None:
|
||||
if SECRET_SOCKET_PATH:
|
||||
path = Path(SECRET_SOCKET_PATH)
|
||||
elif XDG_RUNTIME_DIR:
|
||||
path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss'
|
||||
else:
|
||||
path = '.secretsocket'
|
||||
self.sock = await asyncio.open_unix_connection(str(path))
|
||||
|
||||
async def get_secret(self, key: str) -> bytes:
|
||||
self.sock[1].write(f'{key}\n'.encode('utf-8'))
|
||||
buf = await self.sock[0].read(64 * 2**10)
|
||||
return buf.rstrip(b'\n')
|
||||
|
||||
|
||||
async def ntfy(
|
||||
message: Optional[str] = None,
|
||||
topic: str = NTFY_TOPIC,
|
||||
title: Optional[str] = None,
|
||||
|
@ -86,6 +126,7 @@ def ntfy(
|
|||
if tags:
|
||||
headers['Tags'] = tags
|
||||
url = f'{NTFY_URL}/{topic}'
|
||||
client = httpx.AsyncClient()
|
||||
if attach:
|
||||
if filename:
|
||||
headers['Filename'] = filename
|
||||
|
@ -97,71 +138,22 @@ def ntfy(
|
|||
else:
|
||||
message = message.replace('\n', '\\n')
|
||||
headers['Message'] = message
|
||||
r = requests.put(
|
||||
async with client:
|
||||
r = await client.put(
|
||||
url,
|
||||
headers=headers,
|
||||
data=attach,
|
||||
content=attach,
|
||||
)
|
||||
else:
|
||||
r = requests.post(
|
||||
async with client:
|
||||
r = await client.post(
|
||||
url,
|
||||
headers=headers,
|
||||
data=message,
|
||||
content=message,
|
||||
)
|
||||
r.raise_for_status()
|
||||
|
||||
|
||||
def rbw_unlocked() -> bool:
|
||||
log.debug('Checking if rbw vault is locked')
|
||||
cmd = ['rbw', 'unlocked']
|
||||
p = subprocess.run(cmd, check=False, stdout=subprocess.DEVNULL)
|
||||
unlocked = p.returncode == 0
|
||||
log.info('rbw vault is %s', 'unlocked' if unlocked else 'locked')
|
||||
return unlocked
|
||||
|
||||
|
||||
def rbw_get(
|
||||
name: str, folder: Optional[str] = None, username: Optional[str] = None
|
||||
) -> str:
|
||||
log.info(
|
||||
'Getting password for Bitwarden vault item '
|
||||
'%s (folder: %s, username: %s)',
|
||||
name,
|
||||
folder,
|
||||
username,
|
||||
)
|
||||
cmd = ['rbw', 'get']
|
||||
if folder is not None:
|
||||
cmd += ('--folder', folder)
|
||||
cmd.append(name)
|
||||
if username is not None:
|
||||
cmd.append(username)
|
||||
p = subprocess.run(cmd, check=True, capture_output=True, encoding='utf-8')
|
||||
assert p.stdout is not None
|
||||
return p.stdout.rstrip('\n')
|
||||
|
||||
|
||||
def rbw_code(
|
||||
name: str, folder: Optional[str] = None, username: Optional[str] = None
|
||||
) -> str:
|
||||
log.info(
|
||||
'Getting OTP code for Bitwarden vault item '
|
||||
'%s (folder: %s, username: %s)',
|
||||
name,
|
||||
folder,
|
||||
username,
|
||||
)
|
||||
cmd = ['rbw', 'code']
|
||||
if folder is not None:
|
||||
cmd += ('--folder', folder)
|
||||
cmd.append(name)
|
||||
if username is not None:
|
||||
cmd.append(username)
|
||||
p = subprocess.run(cmd, check=True, capture_output=True, encoding='utf-8')
|
||||
assert p.stdout is not None
|
||||
return p.stdout.rstrip('\n')
|
||||
|
||||
|
||||
def rfc2047_base64encode(
|
||||
message: str,
|
||||
) -> str:
|
||||
|
@ -169,21 +161,16 @@ def rfc2047_base64encode(
|
|||
return f"=?UTF-8?B?{encoded}?="
|
||||
|
||||
|
||||
def secret_from_file(env: str, default: str) -> str:
|
||||
filename = os.environ.get(env, default)
|
||||
log.debug('Loading secret value from %s', filename)
|
||||
with open(filename, 'r', encoding='utf-8') as f:
|
||||
return f.read().rstrip()
|
||||
|
||||
|
||||
def get_last_transaction_date(key: int, token: str) -> datetime.date:
|
||||
async def get_last_transaction_date(key: int, token: str) -> datetime.date:
|
||||
url = f'{FIREFLY_III_URL}/api/v1/accounts/{key}/transactions'
|
||||
r = requests.get(
|
||||
async with httpx.AsyncClient() as client:
|
||||
r = await client.get(
|
||||
url,
|
||||
headers={
|
||||
'Authorization': f'Bearer {token}',
|
||||
'Accept': 'application/vnd.api+json',
|
||||
},
|
||||
timeout=10,
|
||||
)
|
||||
r.raise_for_status()
|
||||
last_date = datetime.datetime.min
|
||||
|
@ -204,13 +191,17 @@ def get_last_transaction_date(key: int, token: str) -> datetime.date:
|
|||
return last_date.date()
|
||||
|
||||
|
||||
def download_chase(
|
||||
page: Page, end_date: datetime.date, token: str, importer: FireflyImporter
|
||||
async def download_chase(
|
||||
page: Page,
|
||||
secrets: SecretsClient,
|
||||
end_date: datetime.date,
|
||||
token: str,
|
||||
importer: FireflyImporter,
|
||||
) -> bool:
|
||||
with Chase(page) as c, ntfyerror('Chase', page) as r:
|
||||
async with Chase(page, secrets) as c, ntfyerror('Chase', page) as r:
|
||||
key = ACCOUNTS['chase']
|
||||
try:
|
||||
start_date = get_last_transaction_date(key, token)
|
||||
start_date = await get_last_transaction_date(key, token)
|
||||
except (OSError, ValueError) as e:
|
||||
log.error(
|
||||
'Skipping Chase account: could not get last transaction: %s',
|
||||
|
@ -223,25 +214,28 @@ def download_chase(
|
|||
start_date,
|
||||
)
|
||||
return True
|
||||
c.login()
|
||||
csv = c.download_transactions(start_date, end_date)
|
||||
await c.login()
|
||||
csv = await c.download_transactions(start_date, end_date)
|
||||
log.info('Importing transactions from Chase into Firefly III')
|
||||
c.firefly_import(csv, key, importer)
|
||||
await c.firefly_import(csv, key, importer)
|
||||
return r.success
|
||||
|
||||
|
||||
def download_commerce(
|
||||
async def download_commerce(
|
||||
page: Page,
|
||||
secrets: SecretsClient,
|
||||
end_date: datetime.date,
|
||||
token: str,
|
||||
importer: FireflyImporter,
|
||||
) -> bool:
|
||||
log.info('Downloading transaction lists from Commerce Bank')
|
||||
csvs = []
|
||||
with CommerceBank(page) as c, ntfyerror('Commerce Bank', page) as r:
|
||||
async with CommerceBank(page, secrets) as c, ntfyerror(
|
||||
'Commerce Bank', page
|
||||
) as r:
|
||||
for name, key in ACCOUNTS['commerce'].items():
|
||||
try:
|
||||
start_date = get_last_transaction_date(key, token)
|
||||
start_date = await get_last_transaction_date(key, token)
|
||||
except (OSError, ValueError) as e:
|
||||
log.error(
|
||||
'Skipping account %s: could not get last transaction: %s',
|
||||
|
@ -261,12 +255,14 @@ def download_commerce(
|
|||
start_date,
|
||||
name,
|
||||
)
|
||||
c.login()
|
||||
c.open_account(name)
|
||||
csvs.append((key, c.download_transactions(start_date, end_date)))
|
||||
await c.login()
|
||||
await c.open_account(name)
|
||||
csvs.append(
|
||||
(key, await c.download_transactions(start_date, end_date))
|
||||
)
|
||||
log.info('Importing transactions from Commerce Bank into Firefly III')
|
||||
for key, csv in csvs:
|
||||
c.firefly_import(csv, key, importer)
|
||||
await c.firefly_import(csv, key, importer)
|
||||
return r.success
|
||||
|
||||
|
||||
|
@ -276,10 +272,10 @@ class ntfyerror:
|
|||
self.page = page
|
||||
self.success = True
|
||||
|
||||
def __enter__(self) -> 'ntfyerror':
|
||||
async def __aenter__(self) -> 'ntfyerror':
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: Optional[Type[Exception]],
|
||||
exc_value: Optional[Exception],
|
||||
|
@ -292,9 +288,13 @@ class ntfyerror:
|
|||
)
|
||||
if os.environ.get('DEBUG_NTFY', '1') == '0':
|
||||
return True
|
||||
if ss := self.page.screenshot():
|
||||
save_screenshot(ss)
|
||||
ntfy(
|
||||
try:
|
||||
if ss := await self.page.screenshot():
|
||||
await asyncio.to_thread(save_screenshot, ss)
|
||||
except Exception:
|
||||
log.exception('Failed to get screenshot:')
|
||||
ss = None
|
||||
await ntfy(
|
||||
message=str(exc_value),
|
||||
title=f'xactfetch failed for {self.bank}',
|
||||
tags='warning',
|
||||
|
@ -367,107 +367,114 @@ class CommerceBank:
|
|||
'ignore_duplicate_transactions': True,
|
||||
}
|
||||
|
||||
def __init__(self, page: Page) -> None:
|
||||
def __init__(self, page: Page, secrets: SecretsClient) -> None:
|
||||
self.page = page
|
||||
self.username = 'admiraln3mo'
|
||||
self.vault_item = 'Commerce Bank'
|
||||
self.vault_folder = 'Websites'
|
||||
self.secrets = secrets
|
||||
self._logged_in = False
|
||||
|
||||
def __enter__(self) -> 'CommerceBank':
|
||||
async def __aenter__(self) -> 'CommerceBank':
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: Optional[Type[Exception]],
|
||||
exc_value: Optional[Exception],
|
||||
tb: Optional[TracebackType],
|
||||
) -> None:
|
||||
self.logout()
|
||||
await self.logout()
|
||||
|
||||
def login(self) -> None:
|
||||
async def login(self) -> None:
|
||||
if self._logged_in:
|
||||
return
|
||||
log.debug('Navigating to %s', self.URL)
|
||||
self.page.goto(self.URL)
|
||||
password = rbw_get(self.vault_item, self.vault_folder, self.username)
|
||||
await self.page.goto(self.URL)
|
||||
username = await self.get_secret('bank.commerce.username')
|
||||
password = await self.get_secret('bank.commerce.password')
|
||||
log.debug('Filling username/password login form')
|
||||
self.page.get_by_role('textbox', name='Customer ID').fill(
|
||||
self.username
|
||||
await self.page.get_by_role('textbox', name='Customer ID').fill(
|
||||
username
|
||||
)
|
||||
self.page.get_by_role('textbox', name='Password').fill(password)
|
||||
self.page.get_by_role('button', name='Log In').click()
|
||||
await self.page.get_by_role('textbox', name='Password').fill(password)
|
||||
await self.page.get_by_role('button', name='Log In').click()
|
||||
log.debug('Waiting for OTP 2FA form')
|
||||
otp_input = self.page.locator('id=securityCodeInput')
|
||||
otp_input.wait_for()
|
||||
self.page.wait_for_timeout(random.randint(1000, 3000))
|
||||
await otp_input.wait_for()
|
||||
await self.page.wait_for_timeout(random.randint(1000, 3000))
|
||||
log.debug('Filling OTP 2FA form')
|
||||
otp = rbw_code(self.vault_item, self.vault_folder, self.username)
|
||||
otp_input.fill(otp)
|
||||
with self.page.expect_event('load'):
|
||||
self.page.get_by_role('button', name='Continue').click()
|
||||
otp = await self.get_secret('bank.commerce.otp')
|
||||
await otp_input.fill(otp)
|
||||
async with self.page.expect_event('load'):
|
||||
await self.page.get_by_role('button', name='Continue').click()
|
||||
log.debug('Waiting for page load')
|
||||
self.page.wait_for_load_state()
|
||||
await self.page.wait_for_load_state()
|
||||
cur_url = urllib.parse.urlparse(self.page.url)
|
||||
if cur_url.path != '/CBI/Accounts/Summary':
|
||||
new_url = cur_url._replace(path='/CBI/Accounts/Summary', query='')
|
||||
self.page.goto(urllib.parse.urlunparse(new_url))
|
||||
await self.page.goto(urllib.parse.urlunparse(new_url))
|
||||
log.info('Successfully logged in to Commerce Bank')
|
||||
self._logged_in = True
|
||||
|
||||
def logout(self) -> None:
|
||||
async def logout(self) -> None:
|
||||
if not self._logged_in:
|
||||
return
|
||||
log.debug('Logging out of Commerce Bank')
|
||||
with self.page.expect_event('load'):
|
||||
self.page.get_by_test_id('navWrap').get_by_text('Logout').click()
|
||||
async with self.page.expect_event('load'):
|
||||
await self.page.get_by_test_id('navWrap').get_by_text(
|
||||
'Logout'
|
||||
).click()
|
||||
log.info('Logged out of Commerce Bank')
|
||||
|
||||
def open_account(self, account: str) -> None:
|
||||
async def open_account(self, account: str) -> None:
|
||||
log.debug('Navigating to activity page for account %s', account)
|
||||
if '/Activity/' in self.page.url:
|
||||
self.page.get_by_role('button', name='My Accounts').click()
|
||||
with self.page.expect_event('load'):
|
||||
self.page.get_by_role('link', name=account).click()
|
||||
self.page.wait_for_load_state()
|
||||
self.page.wait_for_timeout(random.randint(1000, 3000))
|
||||
await self.page.get_by_role('button', name='My Accounts').click()
|
||||
async with self.page.expect_event('load'):
|
||||
await self.page.get_by_role('link', name=account).click()
|
||||
await self.page.wait_for_load_state()
|
||||
await self.page.wait_for_timeout(random.randint(1000, 3000))
|
||||
log.info('Loaded activity page for account %s', account)
|
||||
|
||||
def download_transactions(
|
||||
async def download_transactions(
|
||||
self, from_date: datetime.date, to_date: datetime.date
|
||||
) -> Path:
|
||||
log.info('Downloading transactions from %s to %s', from_date, to_date)
|
||||
datefmt = '%m/%d/%Y'
|
||||
self.page.get_by_role('link', name='Download Transactions').click()
|
||||
self.page.wait_for_timeout(random.randint(750, 1250))
|
||||
await self.page.get_by_role(
|
||||
'link', name='Download Transactions'
|
||||
).click()
|
||||
await self.page.wait_for_timeout(random.randint(750, 1250))
|
||||
modal = self.page.locator('#download-transactions')
|
||||
input_from = modal.locator('input[data-qaid=fromDate]')
|
||||
input_from.click()
|
||||
self.page.keyboard.press('Control+A')
|
||||
self.page.keyboard.press('Delete')
|
||||
self.page.keyboard.type(from_date.strftime(datefmt))
|
||||
await input_from.click()
|
||||
await self.page.keyboard.press('Control+A')
|
||||
await self.page.keyboard.press('Delete')
|
||||
await self.page.keyboard.type(from_date.strftime(datefmt))
|
||||
input_to = modal.locator('input[data-qaid=toDate]')
|
||||
input_to.click()
|
||||
self.page.keyboard.press('Control+A')
|
||||
self.page.keyboard.press('Delete')
|
||||
self.page.keyboard.type(to_date.strftime(datefmt))
|
||||
modal.get_by_role('button', name='Select Type').click()
|
||||
self.page.get_by_text('Comma Separated').click()
|
||||
with self.page.expect_download() as di:
|
||||
self.page.get_by_role('button', name='Download').click()
|
||||
await input_to.click()
|
||||
await self.page.keyboard.press('Control+A')
|
||||
await self.page.keyboard.press('Delete')
|
||||
await self.page.keyboard.type(to_date.strftime(datefmt))
|
||||
await modal.get_by_role('button', name='Select Type').click()
|
||||
await self.page.get_by_text('Comma Separated').click()
|
||||
async with self.page.expect_download() as di:
|
||||
await self.page.get_by_role('button', name='Download').click()
|
||||
log.debug('Waiting for download to complete')
|
||||
path = di.value.path()
|
||||
path = await (await di.value).path()
|
||||
assert path
|
||||
log.info('Downloaded transactions to %s', path)
|
||||
modal.get_by_label('Close').click()
|
||||
await modal.get_by_label('Close').click()
|
||||
return path
|
||||
|
||||
def firefly_import(
|
||||
async def firefly_import(
|
||||
self, csv: Path, account: int, importer: FireflyImporter
|
||||
) -> None:
|
||||
config = copy.deepcopy(self.IMPORT_CONFIG)
|
||||
config['default_account'] = account
|
||||
importer.import_csv(csv, config)
|
||||
await importer.import_csv(csv, config)
|
||||
|
||||
async def get_secret(self, key: str) -> str:
|
||||
secret = await self.secrets.get_secret(key)
|
||||
return secret.decode()
|
||||
|
||||
|
||||
class Chase:
|
||||
|
@ -526,134 +533,163 @@ class Chase:
|
|||
'ignore_duplicate_transactions': True,
|
||||
}
|
||||
|
||||
def __init__(self, page: Page) -> None:
|
||||
def __init__(self, page: Page, secrets: SecretsClient) -> None:
|
||||
self.page = page
|
||||
self.username = 'AdmiralN3mo'
|
||||
self.vault_item = 'Chase'
|
||||
self.vault_folder = 'Websites'
|
||||
self.secrets = secrets
|
||||
self.saved_cookies = Path('cookies.json')
|
||||
self._logged_in = False
|
||||
|
||||
def __enter__(self) -> 'Chase':
|
||||
self.load_cookies()
|
||||
async def __aenter__(self) -> 'Chase':
|
||||
await self.load_cookies()
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: Optional[Type[Exception]],
|
||||
exc_value: Optional[Exception],
|
||||
tb: Optional[TracebackType],
|
||||
) -> None:
|
||||
try:
|
||||
self.logout()
|
||||
await self.logout()
|
||||
finally:
|
||||
self.save_cookies()
|
||||
await self.save_cookies()
|
||||
|
||||
def load_cookies(self) -> None:
|
||||
async def load_cookies(self) -> None:
|
||||
log.debug('Loading saved cookies from %s', self.saved_cookies)
|
||||
try:
|
||||
with self.saved_cookies.open(encoding='utf-8') as f:
|
||||
self.page.context.add_cookies(json.load(f))
|
||||
except:
|
||||
log.warning(
|
||||
'Could not load saved cookies, '
|
||||
'SMS verification will be required!'
|
||||
)
|
||||
cookies = await asyncio.to_thread(json.load, f)
|
||||
await self.page.context.add_cookies(cookies)
|
||||
except Exception as e:
|
||||
log.debug('Failed to load saved cookies: %s', e)
|
||||
else:
|
||||
log.info('Successfully loaded saved cookies')
|
||||
|
||||
def save_cookies(self) -> None:
|
||||
async def save_cookies(self) -> None:
|
||||
log.debug('Saving cookies from %s', self.saved_cookies)
|
||||
try:
|
||||
with self.saved_cookies.open('w', encoding='utf-8') as f:
|
||||
f.write(json.dumps(self.page.context.cookies()))
|
||||
cookies = await self.page.context.cookies()
|
||||
f.write(await asyncio.to_thread(json.dumps, cookies))
|
||||
except Exception as e:
|
||||
log.error('Failed to save cookies: %s', e)
|
||||
else:
|
||||
log.info('Successfully saved cookies to %s', self.saved_cookies)
|
||||
|
||||
def login(self) -> None:
|
||||
async def login(self) -> None:
|
||||
if self._logged_in:
|
||||
return
|
||||
log.debug('Navigating to %s', self.URL)
|
||||
self.page.goto(self.URL)
|
||||
self.page.wait_for_load_state()
|
||||
self.page.wait_for_timeout(random.randint(2000, 4000))
|
||||
password = rbw_get(self.vault_item, self.vault_folder, self.username)
|
||||
await self.page.goto(self.URL)
|
||||
await self.page.wait_for_load_state()
|
||||
await self.page.wait_for_timeout(random.randint(2000, 4000))
|
||||
username = await self.get_secret('bank.chase.username')
|
||||
password = await self.get_secret('bank.chase.password')
|
||||
log.debug('Filling username/password login form')
|
||||
self.page.frame_locator('#logonbox').get_by_label('Username').fill(
|
||||
self.username
|
||||
)
|
||||
self.page.frame_locator('#logonbox').get_by_label('Password').fill(
|
||||
password
|
||||
)
|
||||
self.page.wait_for_timeout(random.randint(500, 750))
|
||||
self.page.frame_locator('#logonbox').get_by_role(
|
||||
'button', name='Sign in'
|
||||
).click()
|
||||
logonbox = self.page.frame_locator('#logonbox')
|
||||
await logonbox.get_by_label('Username').fill(username)
|
||||
await logonbox.get_by_label('Password').fill(password)
|
||||
await self.page.wait_for_timeout(random.randint(500, 750))
|
||||
await logonbox.get_by_role('button', name='Sign in').click()
|
||||
log.debug('Waiting for page load')
|
||||
self.page.wait_for_load_state()
|
||||
self.page.get_by_role('button', name='Pay Card').wait_for(
|
||||
timeout=120000
|
||||
await self.page.wait_for_load_state()
|
||||
logonframe = self.page.frame_locator('iframe[title="logon"]')
|
||||
t_2fa = asyncio.create_task(
|
||||
logonframe.get_by_role(
|
||||
'heading', name="We don't recognize this device"
|
||||
).wait_for()
|
||||
)
|
||||
t_finished = asyncio.create_task(
|
||||
self.page.get_by_role('button', name='Pay Card').wait_for()
|
||||
)
|
||||
done, pending = await asyncio.wait(
|
||||
(t_2fa, t_finished),
|
||||
return_when=asyncio.FIRST_COMPLETED,
|
||||
)
|
||||
for t in pending:
|
||||
t.cancel()
|
||||
for t in done:
|
||||
await t
|
||||
if t_2fa in done:
|
||||
log.warning('Device verification (SMS 2-factor auth) required')
|
||||
await logonframe.get_by_label('Tell us how: Choose one').click()
|
||||
await logonframe.locator(
|
||||
'#container-1-simplerAuth-dropdownoptions-styledselect'
|
||||
).click()
|
||||
otp_task = asyncio.create_task(self.get_secret('bank.chase.otp'))
|
||||
await logonframe.get_by_role('button', name='Next').click()
|
||||
log.info('Waiting for SMS verification code')
|
||||
otp = await otp_task
|
||||
log.debug('Filling verification code form')
|
||||
await logonframe.get_by_label('One-time code').fill(otp)
|
||||
await logonframe.get_by_label('Password').fill(password)
|
||||
await logonframe.get_by_role('button', name='Next').click()
|
||||
await self.page.wait_for_load_state()
|
||||
await self.page.get_by_role('button', name='Pay Card').wait_for()
|
||||
log.info('Successfully logged in to Chase')
|
||||
self._logged_in = True
|
||||
|
||||
def download_transactions(
|
||||
async def download_transactions(
|
||||
self, from_date: datetime.date, to_date: datetime.date
|
||||
) -> Path:
|
||||
log.info('Downloading transactions from %s to %s', from_date, to_date)
|
||||
fmt = '%m/%d/%Y'
|
||||
self.page.locator('#CARD_ACCOUNTS').get_by_role(
|
||||
await self.page.locator('#CARD_ACCOUNTS').get_by_role(
|
||||
'button', name='(...2467)'
|
||||
).first.click()
|
||||
fl = self.page.locator('#flyout')
|
||||
fl.wait_for()
|
||||
fl.get_by_role('button', name='Pay card', exact=True).wait_for()
|
||||
fl.get_by_role(
|
||||
await fl.wait_for()
|
||||
await fl.get_by_role('button', name='Pay card', exact=True).wait_for()
|
||||
await fl.get_by_role(
|
||||
'button', name='Account activity', exact=True
|
||||
).wait_for()
|
||||
fl.get_by_role('link', name='Show details').wait_for()
|
||||
fl.get_by_role('link', name='Activity since last statement').click()
|
||||
fl.get_by_role('link', name='All transactions').click()
|
||||
fl.get_by_text('See more activity').wait_for()
|
||||
fl.get_by_role('button', name='Download Account Activity').click()
|
||||
await fl.get_by_role('link', name='Show details').wait_for()
|
||||
await fl.get_by_role(
|
||||
'link', name='Activity since last statement'
|
||||
).click()
|
||||
await fl.get_by_role('link', name='All transactions').click()
|
||||
await fl.get_by_text('See more activity').wait_for()
|
||||
await fl.get_by_role(
|
||||
'button', name='Download Account Activity'
|
||||
).click()
|
||||
log.debug('Filling account activity download form')
|
||||
self.page.locator('#select-downloadActivityOptionId-label').click()
|
||||
self.page.get_by_text('Choose a date range').nth(1).locator(
|
||||
await self.page.locator(
|
||||
'#select-downloadActivityOptionId-label'
|
||||
).click()
|
||||
await self.page.get_by_text('Choose a date range').nth(1).locator(
|
||||
'../..'
|
||||
).click()
|
||||
self.page.wait_for_timeout(random.randint(500, 1500))
|
||||
self.page.locator('#accountActivityFromDate-input-input').fill(
|
||||
await self.page.wait_for_timeout(random.randint(500, 1500))
|
||||
await self.page.locator('#accountActivityFromDate-input-input').fill(
|
||||
from_date.strftime(fmt)
|
||||
)
|
||||
self.page.locator('#accountActivityFromDate-input-input').blur()
|
||||
self.page.wait_for_timeout(random.randint(500, 1500))
|
||||
self.page.locator('#accountActivityToDate-input-input').fill(
|
||||
await self.page.locator('#accountActivityFromDate-input-input').blur()
|
||||
await self.page.wait_for_timeout(random.randint(500, 1500))
|
||||
await self.page.locator('#accountActivityToDate-input-input').fill(
|
||||
to_date.strftime(fmt)
|
||||
)
|
||||
self.page.locator('#accountActivityToDate-input-input').blur()
|
||||
self.page.wait_for_timeout(random.randint(500, 1500))
|
||||
with self.page.expect_download(timeout=5000) as di:
|
||||
self.page.get_by_role(
|
||||
await self.page.locator('#accountActivityToDate-input-input').blur()
|
||||
await self.page.wait_for_timeout(random.randint(500, 1500))
|
||||
async with self.page.expect_download(timeout=5000) as di:
|
||||
await self.page.get_by_role(
|
||||
'button', name='Download', exact=True
|
||||
).click()
|
||||
log.debug('Waiting for download to complete')
|
||||
self.page.wait_for_timeout(random.randint(1000, 2500))
|
||||
path = di.value.path()
|
||||
await self.page.wait_for_timeout(random.randint(1000, 2500))
|
||||
path = await (await di.value).path()
|
||||
assert path
|
||||
log.info('Downloaded transactions to %s', path)
|
||||
return path
|
||||
|
||||
def logout(self) -> None:
|
||||
async def logout(self) -> None:
|
||||
if not self._logged_in:
|
||||
return
|
||||
log.debug('Logging out of Chase')
|
||||
with self.page.expect_event('load'):
|
||||
self.page.get_by_role('button', name='Sign out').click()
|
||||
async with self.page.expect_event('load'):
|
||||
await self.page.get_by_role('button', name='Sign out').click()
|
||||
log.info('Logged out of Chase')
|
||||
|
||||
def firefly_import(
|
||||
async def firefly_import(
|
||||
self, csv: Path, account: int, importer: FireflyImporter
|
||||
) -> None:
|
||||
config = copy.deepcopy(self.IMPORT_CONFIG)
|
||||
|
@ -668,37 +704,66 @@ class Chase:
|
|||
config['do_mapping'].pop(0)
|
||||
else:
|
||||
raise ValueError(f'Unexpected CSV schema: {headers}')
|
||||
importer.import_csv(csv, config)
|
||||
await importer.import_csv(csv, config)
|
||||
|
||||
async def get_secret(self, key: str) -> str:
|
||||
secret = await self.secrets.get_secret(key)
|
||||
return secret.decode()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
log.debug('Getting Firefly III access token from rbw vault')
|
||||
token = rbw_get('xactfetch')
|
||||
import_secret = secret_from_file(
|
||||
'FIREFLY_IMPORT_SECRET_FILE', 'import.secret'
|
||||
)
|
||||
async def fetch_transactions(pw: Playwright, secrets: SecretsClient) -> bool:
|
||||
log.debug('Getting Firefly III access token')
|
||||
token = (await secrets.get_secret('firefly.token')).decode()
|
||||
import_secret = (
|
||||
await secrets.get_secret('firefly.import.secret')
|
||||
).decode()
|
||||
import_auth = (
|
||||
os.environ.get('FIREFLY_IMPORT_USER', getpass.getuser()),
|
||||
secret_from_file('FIREFLY_IMPORT_PASSWORD_FILE', 'import.password'),
|
||||
(await secrets.get_secret('firefly.import.username')).decode(),
|
||||
(await secrets.get_secret('firefly.import.password')).decode(),
|
||||
)
|
||||
importer = FireflyImporter(
|
||||
FIREFLY_III_IMPORTER_URL, import_secret, import_auth
|
||||
)
|
||||
end_date = datetime.date.today() - datetime.timedelta(days=1)
|
||||
with sync_playwright() as pw:
|
||||
headless = os.environ.get('DEBUG_HEADLESS_BROWSER', '1') == '1'
|
||||
browser = pw.firefox.launch(headless=headless)
|
||||
page = browser.new_page()
|
||||
failed = False
|
||||
browser = await pw.chromium.launch(headless=False)
|
||||
context = await browser.new_context()
|
||||
await context.tracing.start(screenshots=True, snapshots=True)
|
||||
page = await context.new_page()
|
||||
banks = sys.argv[1:] or list(ACCOUNTS.keys())
|
||||
if 'commerce' in banks:
|
||||
if not download_commerce(page, end_date, token, importer):
|
||||
if not await download_commerce(
|
||||
page, secrets, end_date, token, importer
|
||||
):
|
||||
failed = True
|
||||
if 'chase' in banks:
|
||||
if not download_chase(page, end_date, token, importer):
|
||||
if not await download_chase(page, secrets, end_date, token, importer):
|
||||
failed = True
|
||||
if failed:
|
||||
await context.tracing.stop(path='trace.zip')
|
||||
with open('trace.zip', 'rb') as f:
|
||||
await ntfy(
|
||||
'Downloading one or more transaction lists failed.',
|
||||
attach=f.read(),
|
||||
filename='trace.zip',
|
||||
)
|
||||
return failed
|
||||
|
||||
|
||||
async def amain() -> None:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
async with SecretsClient() as secrets:
|
||||
try:
|
||||
async with async_playwright() as pw:
|
||||
failed = await fetch_transactions(pw, secrets)
|
||||
raise SystemExit(1 if failed else 0)
|
||||
except asyncio.exceptions.InvalidStateError as e:
|
||||
log.debug('Ignoring exception: %s', e, exc_info=sys.exc_info())
|
||||
|
||||
|
||||
def main():
|
||||
asyncio.run(amain())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Reference in New Issue