1
0
Fork 0

Compare commits

...

11 Commits

Author SHA1 Message Date
Dustin bdcb8c93b6 xactfetch: Suppress asyncio InvalidStateError
dustin/xactfetch/pipeline/head This commit looks good Details
There is currently a [bug][0] in the Python Playwright API that causes
_asyncio_ to raise an `InvalidStateError` occasionally when the
`PlaywrightContextManager` exits.  This causes the program to exit
with a nonzero return code, even though it actually completed
successfully, which will cause the Job to be retried.  To avoid this,
we can catch and ignore the spurious exception.

I've reorganized the code a bit here because we have to wrap the whole
`with` block in the `try`/`except`; moving the contents of the block
into a function keeps the indentation level from getting out of control.

[0]: https://github.com/microsoft/playwright-python/issues/2238
2024-07-11 22:34:49 -05:00
Dustin 3ff18d1042 container: Add secretsocket, chase2fa scripts
While the original intent of the `secretsocket` script was to have `rbw`
run outside the `xactfetch` container, that is only useful during
development; both processes need to run in the container in Kubernetes.
2024-07-11 21:50:27 -05:00
Dustin 0f9b3a5ac5 secretsocket: Respect SECRET_SOCKET_PATH
The `secretsocket` server will now create its IPC soket at the location
specified by the `SECRET_SOCKET_PATH` environment variable, if set.
This way, both `secretsocket` and `xactfetch` can be pointed to the
same location with this single variable.
2024-07-11 21:50:27 -05:00
Dustin e4742f1c6e container: Optimize layer cache usage
With the addition of ancillary scripts like `entrypoint.sh`, the `COPY .`
instruction in the build stage results in a full rebuild of the final
image for every change.  To avoid this, we now only copy the files that
are actually required to build the wheel.  The other scripts are copied
later, using an intermediate layer.  This avoids needing a `COPY`
instruction, and therefore a new layer in the final image, for each
script.  Hypothetically, we could use `RUN --mount=bind` and copy the
files with the `install` command, but bind-mounting the build context
doesn't actually work; SELinux prevents the container builder from
accessing the source directory directly.
2024-07-11 21:50:27 -05:00
Dustin 76cb7c7958 container: Rebase on dch-base 2024-07-11 21:50:27 -05:00
Dustin bef7206642 entrypoint: Start secretsocket server if needed
If the `SECRET_SOCKET_PATH` environment variable is not set, or refers
to a non-existent path, then we assume we need to manage the
`secretsocket` server ourselves.
2024-07-11 21:50:27 -05:00
Dustin 28fe49c2b2 xactfetch: Save Playwright trace for failed runs
Playwright has a nifty feature called the [Trace Viewer][0], which you
can use to observe the state of the page at any given point during the
browsing session.  This should make troubleshooting failures a lot
easier.

[0]: https://playwright.dev/python/docs/trace-viewer-intro
2024-07-11 21:48:47 -05:00
Dustin 9f113d6a3f xactfetch: Switch to headed Chrome
Earlier this week, `xactfetch` stopped being able to log in to the Chase
website.  After logging in, the website just popped up a message that
said "It looks like this part of our website isn't working right now,"
with a hint that I should try a different browser.  I suspect they have
enhanced their bot detection/scraping resistance, because the error
only occurs when `xactfetch` is run from inside a container.  It happens
every time in that case, but never when I run it on my computer
directly.

After several hours of messing with this, the only way I was able to
get it to work is to use full-blown headed Chromium.  Neither headless
nor headed Firefox works, nor does headless Chromium.  This is a bit
cumbersome, but not really a big deal.  Headed Chromium works fine in
an Xvfb session.
2024-07-11 21:34:11 -05:00
Dustin 8de0d93eb1 xactfetch: chase: Handle SMS 2-factor auth
When logging in to the Chase website with a fresh browser profile, or
otherwise without any cookies, the user will be required to "validate
the device" using a one-time code delivered via SMS.  Previously, I
handled this by running the `xactfetch` script with a headed browser,
manually entering the verification code when the prompt came up.  Then,
I would copy the `cookies.json` file, now containing a cookie indicating
the device had been verified, to the Kubernetes volume, where it would
be used by the production pod.

Now that `xactfetch` uses asyncio, it is possible for the Chase `login`
method to wait for one of multiple conditions: either login succeeds,
or SMS 2FA is required.  In the case of the latter, we can get the
2FA code from the secret server and enter it into the form to complete
the login process.

The real magic here is how we're getting the 2FA code from the SMS
message.  There are two components to this.  First, I've installed [SMS
to URL Forwarder][0] on my phone.  This app does what it says on the
tin: it relays SMS messages to an HTTP(S) server.  I have configured it
to forward messages from the Chase SMS 2FA short code to an _ntfy_
topic.  The second component is the `chase2fa` script, which is called
by the secret server.  This script listens for notifications on the
_ntfy_ topic where the SMS messages are forwarded.  When a message
arrives, it extracts the verification code using a simple regular
expression that identifies a several-digit number.

With all these pieces in place, the `xactfetch` script is no longer
thwarted by the SMS 2FA barrier!

[0]: https://github.com/bogkonstantin/android_income_sms_gateway_webhook
2024-07-11 21:21:03 -05:00
Dustin 43aba0c848 Switch to async API
Using the Playwrigt async API is the only way to wait for one of
multiple conditions.  We will need this capability in order to detect
certain abnormal conditions, such as spurious 2FA auth or interstitial
ads.
2024-07-10 14:54:23 -05:00
Dustin b30b38f76f secretsocket: Handle secrets via external process
`xactfetch` has three different ways of reading secret values:

* From environment variables
* By reading the contents of a file (specified by environment variables)
* By looking them up in the Bitwarden vault

This is very cumbersome to work with, especially when trying to
troubleshoot using the container image locally.

To make this easier, I've factored out all secret lookup functionality
into a separate process.  This process listens on a UNIX socket and
implements a very simple secret lookup protocol.  The client
(`xactfetch` itself in this case) sends a string key, identifying the
secret it wants to look up, terminated by a single line feed character.
The `secretsocket` server looks up the secret associated with that key,
using the method defined in a TOML configuration file.  There are four
supported methods:

* Environment variables
* External programs
* File contents
* Static strings

The value returned by the corresponding method is then sent back to the
client via the socket connection, again as a string terminated with a
line feed.

Moving the secret handling into a separate process simplifies the
environment configuration needed in order to run `xactfetch`.  Notably,
when running it in a container, only the `secretsocket` soket needs to
be mounted into the container.  Since `rbw` is executed by the server
process now, rather than `xactfetch` directly, the vault does not need
to be present in the `xactfetch` container.  Indeed, none of the secret
values need to be present in the container.
2024-07-10 14:54:23 -05:00
8 changed files with 596 additions and 265 deletions

View File

@ -1,5 +1,8 @@
*
!.git
!chase2fa.py
!entrypoint.sh
!pinentry-stub.sh
!pyproject.toml
!secretsocket.py
!xactfetch.py

1
.gitignore vendored
View File

@ -1,6 +1,7 @@
/.venv
/cookies.json
/screenshot_*.png
/secrets.toml
*.egg-info/
__pycache__/
*.py[co]

View File

@ -1,4 +1,4 @@
FROM registry.fedoraproject.org/fedora-minimal:38 AS build
FROM git.pyrocufflink.net/containerimages/dch-base AS build
RUN --mount=type=cache,target=/var/cache \
microdnf install -y \
@ -18,11 +18,22 @@ RUN --mount=type=cache,target=/var/cache \
python3-wheel \
&& :
COPY . /src
COPY .git /src/.git
COPY xactfetch.py pyproject.toml /src
RUN python3 -m pip wheel -w /wheels /src
FROM registry.fedoraproject.org/fedora-minimal:38
FROM scratch AS mixin
COPY pinentry-stub.sh /usr/local/bin/pinentry-stub
COPY secretsocket.py /usr/local/bin/secretsocket
COPY chase2fa.py /usr/local/bin/chase2fa
COPY entrypoint.sh /entrypoint.sh
FROM git.pyrocufflink.net/containerimages/dch-base
RUN --mount=type=cache,target=/var/cache \
microdnf install -y \
@ -47,11 +58,16 @@ RUN --mount=type=cache,target=/var/cache \
libXrandr \
libXrender \
libXtst \
libdrm \
libxcb \
mesa-libgbm \
nspr \
nss \
pango \
python3 \
python3-pip \
tini \
xorg-x11-server-Xvfb \
&& echo xactfetch:x:2468: >> /etc/group \
&& echo xactfetch:*:2468:2468:xactfetch:/var/lib/xactfetch:/sbin/nologin >> /etc/passwd \
&& :
@ -61,16 +77,15 @@ ENV PLAYWRIGHT_BROWSERS_PATH=/usr/local/playwright/browsers
RUN --mount=type=bind,from=build,source=/,target=/build \
python3 -m pip install --no-index -f /build/wheels xactfetch \
&& cp /build/root/.cargo/bin/rbw* /usr/local/bin/ \
&& install /build/src/pinentry-stub.sh /usr/local/bin/pinentry-stub \
&& playwright install firefox \
&& playwright install chromium \
&& :
COPY --from=mixin / /
VOLUME /var/lib/xactfetch
WORKDIR /var/lib/xactfetch
USER 2468:2468
ENV XDG_CONFIG_HOME=/etc
ENTRYPOINT ["tini", "xactfetch", "--"]
ENTRYPOINT ["/entrypoint.sh"]

19
chase2fa.py Executable file
View File

@ -0,0 +1,19 @@
#!/usr/bin/env python3
import re
import httpx
stream = httpx.stream(
'GET',
'https://ntfy.pyrocufflink.blue/chase2fa/raw',
timeout=httpx.Timeout(5, read=None),
)
with stream as r:
for line in r.iter_lines():
line = line.strip()
if not line:
continue
m = re.search(r'\d{4,}', line)
if m:
print(m.group(0))
break

19
entrypoint.sh Executable file
View File

@ -0,0 +1,19 @@
#!/bin/sh
if [ $$ -eq 1 ]; then
exec tini "$0" -- "$@"
fi
if [ -z "${SECRET_SOCKET_PATH}" ] || [ ! -e "${SECRET_SOCKET_PATH}" ]; then
export SECRET_SOCKET_PATH="${SECRET_SOCKET_PATH:-/tmp/.secretsocket}"
secretsocket &
sspid=$!
fi
xvfb-run -e /dev/stderr -s '-screen 0 1920x1080x24 -nolisten unix' xactfetch "$@"
rc=$?
if [ -n "${sspid}" ]; then
kill $sspid
fi
exit $rc

View File

@ -11,6 +11,7 @@ classifiers = [
"Programming Language :: Python :: 3",
]
dependencies = [
"httpx~=0.27.0",
"playwright~=1.32",
"requests~=2.29.0",
]
@ -28,3 +29,7 @@ build-backend = "setuptools.build_meta"
[tool.pyright]
venvPath = '.'
venv = '.venv'
[tool.black]
line-length = 79
skip-string-normalization = true

204
secretsocket.py Executable file
View File

@ -0,0 +1,204 @@
#!/usr/bin/env python3
import asyncio
import logging
import os
import shlex
import signal
import socket
import struct
import tomllib
from pathlib import Path
from typing import Optional
log = logging.getLogger('secretsocket')
ALLOW_UNKNOWN_PEER = os.environ.get('ALLOW_UNKNOWN_PEER') == '1'
SECRET_SOCKET_PATH = os.environ.get('SECRET_SOCKET_PATH')
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR')
class Secret:
async def lookup(self) -> Optional[bytes]:
raise NotImplementedError
class EnvSecret(Secret):
def __init__(self, env_var: str) -> None:
self.env_var = env_var
async def lookup(self) -> Optional[bytes]:
return os.environb.get(self.env_var.encode('utf-8'))
class ExecSecret(Secret):
def __init__(self, cmd: str) -> None:
self.cmd = cmd
async def lookup(self) -> Optional[bytes]:
args = shlex.split(self.cmd)
proc = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
)
o = await proc.communicate()
return o[0]
class PathSecret(Secret):
def __init__(self, path: Path) -> None:
self.path = path
async def lookup(self) -> Optional[bytes]:
try:
f = self.path.expanduser().open('rb')
except OSError as e:
log.error('Failed to read secret from %s: %s', self.path, e)
return None
with f:
return await asyncio.to_thread(f.read)
class StringSecret(Secret):
def __init__(self, value: str) -> None:
self.value = value
async def lookup(self) -> Optional[bytes]:
return self.value.encode('utf-8')
class SecretServer:
def __init__(self, path: Optional[Path] = None) -> None:
self.path = path
async def handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
try:
sock = writer.get_extra_info('socket')
pid, uid, gid = get_socket_peercred(sock)
except Exception as e:
log.error('Failed to get peer credentials: %s', e)
pid, uid, gid = None, None, None
log.debug('Client connected (pid %d, uid %d, gid %d)', pid, uid, gid)
if uid is None:
if ALLOW_UNKNOWN_PEER:
log.warning('Handling connection from unknown peer')
else:
log.error('Refusing to handle connection from unknown peer')
writer.close()
return
else:
my_uid = os.getresuid()[1]
if uid != my_uid:
log.error(
'Refusing to handle connection from PID %d: '
'peer UID %d does not match %d',
pid,
uid,
my_uid,
)
writer.close()
return
while 1:
try:
key = (await reader.readuntil(b'\n')).rstrip(b'\n').decode()
except asyncio.IncompleteReadError:
break
else:
log.info('Client %d requested secret %s', pid, key)
try:
secret = await self.get_secret(key)
except Exception:
log.exception('Failed to get secret:')
writer.close()
return
else:
writer.write(secret + b'\n')
await writer.drain()
log.debug('Client disconnected')
writer.close()
await writer.wait_closed()
async def get_secret(self, key: str) -> bytes:
secrets = await load_secrets(self.path)
if secret := secrets.get(key):
if value := await secret.lookup():
return value
else:
log.warning('Lookup of secret %s failed', key)
else:
log.warning('Unknown secret: %s', key)
return b''
def get_socket_peercred(sock: socket.socket) -> tuple[int, int, int]:
struct_ucred = '=iii'
buflen = struct.calcsize(struct_ucred)
cred = sock.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, buflen)
return struct.unpack(struct_ucred, cred)
async def load_secrets(path: Optional[Path] = None) -> dict[str, Secret]:
if path is None:
path = Path('secrets.toml')
secrets = {}
try:
f = path.open('rb')
except OSError as e:
log.error('Failed to load secrets: %s', e)
return secrets
with f:
config = await asyncio.to_thread(tomllib.load, f)
for key, value in config.items():
if 'env' in value:
secrets[key] = EnvSecret(value['env'])
elif 'exec' in value:
secrets[key] = ExecSecret(value['exec'])
elif 'path' in value:
secrets[key] = PathSecret(Path(value['path']))
elif 'string' in value:
secrets[key] = StringSecret(value['string'])
else:
log.warning(
'Unsupported configuration for secret %s: %r', key, value
)
return secrets
def shutdown(signum, server):
log.info('Received signal %d, shutting down', signum)
server.close()
async def main():
logging.basicConfig(level=logging.DEBUG)
if SECRET_SOCKET_PATH:
sock_path = Path(SECRET_SOCKET_PATH)
elif XDG_RUNTIME_DIR:
sock_path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss'
else:
sock_path = Path('/tmp/.secretsocket')
if not sock_path.parent.exists():
sock_path.parent.mkdir()
if sock_path.exists():
sock_path.unlink()
ss = SecretServer()
server = await asyncio.start_unix_server(ss.handle_client, path=sock_path)
async with server:
await server.start_serving()
loop = asyncio.get_running_loop()
for signum in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(signum, shutdown, signum, server)
await server.wait_closed()
if __name__ == '__main__':
asyncio.run(main())

View File

@ -1,30 +1,35 @@
import asyncio
import base64
import copy
import datetime
import getpass
import json
import logging
import os
import random
import sys
import subprocess
import urllib.parse
from pathlib import Path
from types import TracebackType
from typing import Any, Optional, Type
import requests
from playwright.sync_api import Page
from playwright.sync_api import sync_playwright
import httpx
from playwright.async_api import Playwright, Page
from playwright.async_api import async_playwright
log = logging.getLogger('xactfetch')
NTFY_URL = os.environ['NTFY_URL']
NTFY_TOPIC = os.environ['NTFY_TOPIC']
FIREFLY_III_URL = os.environ['FIREFLY_III_URL']
FIREFLY_III_IMPORTER_URL = os.environ['FIREFLY_IMPORT_URL']
NTFY_URL = os.environ.get('NTFY_URL', 'https://ntfy.pyrocufflink.blue')
NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'dustin')
FIREFLY_III_URL = os.environ.get(
'FIREFLY_III_URL', 'https://firefly.pyrocufflink.blue'
)
FIREFLY_III_IMPORTER_URL = os.environ.get(
'FIREFLY_IMPORT_URL', 'https://firefly-importer.pyrocufflink.blue'
)
SECRET_SOCKET_PATH = os.environ.get('SECRET_SOCKET_PATH')
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR')
ACCOUNTS = {
'commerce': {
@ -45,8 +50,9 @@ class FireflyImporter:
self.url = url
self.secret = secret
self.auth = auth
self.client = httpx.AsyncClient()
def import_csv(
async def import_csv(
self,
csv: Path,
config: dict[str, Any],
@ -54,24 +60,58 @@ class FireflyImporter:
log.debug('Importing transactions from %s to Firefly III', csv)
url = f'{self.url.rstrip("/")}/autoupload'
with csv.open('rb') as f:
r = requests.post(
async with httpx.AsyncClient(auth=self.auth) as client:
r = await client.post(
url,
auth=self.auth,
headers={
'Accept': 'application/json',
},
params={
'secret': self.secret,
},
headers={
'Accept': 'application/json',
},
files={
'importable': ('import.csv', f),
'json': ('import.json', json.dumps(config)),
},
timeout=300,
)
r.raise_for_status()
def ntfy(
class SecretsClient:
def __init__(self) -> None:
self.sock: tuple[asyncio.StreamReader, asyncio.StreamWriter]
async def __aenter__(self) -> 'SecretsClient':
if not hasattr(self, 'sock'):
await self.connect()
return self
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> bool:
self.sock[1].close()
return False
async def connect(self) -> None:
if SECRET_SOCKET_PATH:
path = Path(SECRET_SOCKET_PATH)
elif XDG_RUNTIME_DIR:
path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss'
else:
path = '.secretsocket'
self.sock = await asyncio.open_unix_connection(str(path))
async def get_secret(self, key: str) -> bytes:
self.sock[1].write(f'{key}\n'.encode('utf-8'))
buf = await self.sock[0].read(64 * 2**10)
return buf.rstrip(b'\n')
async def ntfy(
message: Optional[str] = None,
topic: str = NTFY_TOPIC,
title: Optional[str] = None,
@ -86,6 +126,7 @@ def ntfy(
if tags:
headers['Tags'] = tags
url = f'{NTFY_URL}/{topic}'
client = httpx.AsyncClient()
if attach:
if filename:
headers['Filename'] = filename
@ -97,71 +138,22 @@ def ntfy(
else:
message = message.replace('\n', '\\n')
headers['Message'] = message
r = requests.put(
async with client:
r = await client.put(
url,
headers=headers,
data=attach,
content=attach,
)
else:
r = requests.post(
async with client:
r = await client.post(
url,
headers=headers,
data=message,
content=message,
)
r.raise_for_status()
def rbw_unlocked() -> bool:
log.debug('Checking if rbw vault is locked')
cmd = ['rbw', 'unlocked']
p = subprocess.run(cmd, check=False, stdout=subprocess.DEVNULL)
unlocked = p.returncode == 0
log.info('rbw vault is %s', 'unlocked' if unlocked else 'locked')
return unlocked
def rbw_get(
name: str, folder: Optional[str] = None, username: Optional[str] = None
) -> str:
log.info(
'Getting password for Bitwarden vault item '
'%s (folder: %s, username: %s)',
name,
folder,
username,
)
cmd = ['rbw', 'get']
if folder is not None:
cmd += ('--folder', folder)
cmd.append(name)
if username is not None:
cmd.append(username)
p = subprocess.run(cmd, check=True, capture_output=True, encoding='utf-8')
assert p.stdout is not None
return p.stdout.rstrip('\n')
def rbw_code(
name: str, folder: Optional[str] = None, username: Optional[str] = None
) -> str:
log.info(
'Getting OTP code for Bitwarden vault item '
'%s (folder: %s, username: %s)',
name,
folder,
username,
)
cmd = ['rbw', 'code']
if folder is not None:
cmd += ('--folder', folder)
cmd.append(name)
if username is not None:
cmd.append(username)
p = subprocess.run(cmd, check=True, capture_output=True, encoding='utf-8')
assert p.stdout is not None
return p.stdout.rstrip('\n')
def rfc2047_base64encode(
message: str,
) -> str:
@ -169,21 +161,16 @@ def rfc2047_base64encode(
return f"=?UTF-8?B?{encoded}?="
def secret_from_file(env: str, default: str) -> str:
filename = os.environ.get(env, default)
log.debug('Loading secret value from %s', filename)
with open(filename, 'r', encoding='utf-8') as f:
return f.read().rstrip()
def get_last_transaction_date(key: int, token: str) -> datetime.date:
async def get_last_transaction_date(key: int, token: str) -> datetime.date:
url = f'{FIREFLY_III_URL}/api/v1/accounts/{key}/transactions'
r = requests.get(
async with httpx.AsyncClient() as client:
r = await client.get(
url,
headers={
'Authorization': f'Bearer {token}',
'Accept': 'application/vnd.api+json',
},
timeout=10,
)
r.raise_for_status()
last_date = datetime.datetime.min
@ -204,13 +191,17 @@ def get_last_transaction_date(key: int, token: str) -> datetime.date:
return last_date.date()
def download_chase(
page: Page, end_date: datetime.date, token: str, importer: FireflyImporter
async def download_chase(
page: Page,
secrets: SecretsClient,
end_date: datetime.date,
token: str,
importer: FireflyImporter,
) -> bool:
with Chase(page) as c, ntfyerror('Chase', page) as r:
async with Chase(page, secrets) as c, ntfyerror('Chase', page) as r:
key = ACCOUNTS['chase']
try:
start_date = get_last_transaction_date(key, token)
start_date = await get_last_transaction_date(key, token)
except (OSError, ValueError) as e:
log.error(
'Skipping Chase account: could not get last transaction: %s',
@ -223,25 +214,28 @@ def download_chase(
start_date,
)
return True
c.login()
csv = c.download_transactions(start_date, end_date)
await c.login()
csv = await c.download_transactions(start_date, end_date)
log.info('Importing transactions from Chase into Firefly III')
c.firefly_import(csv, key, importer)
await c.firefly_import(csv, key, importer)
return r.success
def download_commerce(
async def download_commerce(
page: Page,
secrets: SecretsClient,
end_date: datetime.date,
token: str,
importer: FireflyImporter,
) -> bool:
log.info('Downloading transaction lists from Commerce Bank')
csvs = []
with CommerceBank(page) as c, ntfyerror('Commerce Bank', page) as r:
async with CommerceBank(page, secrets) as c, ntfyerror(
'Commerce Bank', page
) as r:
for name, key in ACCOUNTS['commerce'].items():
try:
start_date = get_last_transaction_date(key, token)
start_date = await get_last_transaction_date(key, token)
except (OSError, ValueError) as e:
log.error(
'Skipping account %s: could not get last transaction: %s',
@ -261,12 +255,14 @@ def download_commerce(
start_date,
name,
)
c.login()
c.open_account(name)
csvs.append((key, c.download_transactions(start_date, end_date)))
await c.login()
await c.open_account(name)
csvs.append(
(key, await c.download_transactions(start_date, end_date))
)
log.info('Importing transactions from Commerce Bank into Firefly III')
for key, csv in csvs:
c.firefly_import(csv, key, importer)
await c.firefly_import(csv, key, importer)
return r.success
@ -276,10 +272,10 @@ class ntfyerror:
self.page = page
self.success = True
def __enter__(self) -> 'ntfyerror':
async def __aenter__(self) -> 'ntfyerror':
return self
def __exit__(
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
@ -292,9 +288,13 @@ class ntfyerror:
)
if os.environ.get('DEBUG_NTFY', '1') == '0':
return True
if ss := self.page.screenshot():
save_screenshot(ss)
ntfy(
try:
if ss := await self.page.screenshot():
await asyncio.to_thread(save_screenshot, ss)
except Exception:
log.exception('Failed to get screenshot:')
ss = None
await ntfy(
message=str(exc_value),
title=f'xactfetch failed for {self.bank}',
tags='warning',
@ -367,107 +367,114 @@ class CommerceBank:
'ignore_duplicate_transactions': True,
}
def __init__(self, page: Page) -> None:
def __init__(self, page: Page, secrets: SecretsClient) -> None:
self.page = page
self.username = 'admiraln3mo'
self.vault_item = 'Commerce Bank'
self.vault_folder = 'Websites'
self.secrets = secrets
self._logged_in = False
def __enter__(self) -> 'CommerceBank':
async def __aenter__(self) -> 'CommerceBank':
return self
def __exit__(
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> None:
self.logout()
await self.logout()
def login(self) -> None:
async def login(self) -> None:
if self._logged_in:
return
log.debug('Navigating to %s', self.URL)
self.page.goto(self.URL)
password = rbw_get(self.vault_item, self.vault_folder, self.username)
await self.page.goto(self.URL)
username = await self.get_secret('bank.commerce.username')
password = await self.get_secret('bank.commerce.password')
log.debug('Filling username/password login form')
self.page.get_by_role('textbox', name='Customer ID').fill(
self.username
await self.page.get_by_role('textbox', name='Customer ID').fill(
username
)
self.page.get_by_role('textbox', name='Password').fill(password)
self.page.get_by_role('button', name='Log In').click()
await self.page.get_by_role('textbox', name='Password').fill(password)
await self.page.get_by_role('button', name='Log In').click()
log.debug('Waiting for OTP 2FA form')
otp_input = self.page.locator('id=securityCodeInput')
otp_input.wait_for()
self.page.wait_for_timeout(random.randint(1000, 3000))
await otp_input.wait_for()
await self.page.wait_for_timeout(random.randint(1000, 3000))
log.debug('Filling OTP 2FA form')
otp = rbw_code(self.vault_item, self.vault_folder, self.username)
otp_input.fill(otp)
with self.page.expect_event('load'):
self.page.get_by_role('button', name='Continue').click()
otp = await self.get_secret('bank.commerce.otp')
await otp_input.fill(otp)
async with self.page.expect_event('load'):
await self.page.get_by_role('button', name='Continue').click()
log.debug('Waiting for page load')
self.page.wait_for_load_state()
await self.page.wait_for_load_state()
cur_url = urllib.parse.urlparse(self.page.url)
if cur_url.path != '/CBI/Accounts/Summary':
new_url = cur_url._replace(path='/CBI/Accounts/Summary', query='')
self.page.goto(urllib.parse.urlunparse(new_url))
await self.page.goto(urllib.parse.urlunparse(new_url))
log.info('Successfully logged in to Commerce Bank')
self._logged_in = True
def logout(self) -> None:
async def logout(self) -> None:
if not self._logged_in:
return
log.debug('Logging out of Commerce Bank')
with self.page.expect_event('load'):
self.page.get_by_test_id('navWrap').get_by_text('Logout').click()
async with self.page.expect_event('load'):
await self.page.get_by_test_id('navWrap').get_by_text(
'Logout'
).click()
log.info('Logged out of Commerce Bank')
def open_account(self, account: str) -> None:
async def open_account(self, account: str) -> None:
log.debug('Navigating to activity page for account %s', account)
if '/Activity/' in self.page.url:
self.page.get_by_role('button', name='My Accounts').click()
with self.page.expect_event('load'):
self.page.get_by_role('link', name=account).click()
self.page.wait_for_load_state()
self.page.wait_for_timeout(random.randint(1000, 3000))
await self.page.get_by_role('button', name='My Accounts').click()
async with self.page.expect_event('load'):
await self.page.get_by_role('link', name=account).click()
await self.page.wait_for_load_state()
await self.page.wait_for_timeout(random.randint(1000, 3000))
log.info('Loaded activity page for account %s', account)
def download_transactions(
async def download_transactions(
self, from_date: datetime.date, to_date: datetime.date
) -> Path:
log.info('Downloading transactions from %s to %s', from_date, to_date)
datefmt = '%m/%d/%Y'
self.page.get_by_role('link', name='Download Transactions').click()
self.page.wait_for_timeout(random.randint(750, 1250))
await self.page.get_by_role(
'link', name='Download Transactions'
).click()
await self.page.wait_for_timeout(random.randint(750, 1250))
modal = self.page.locator('#download-transactions')
input_from = modal.locator('input[data-qaid=fromDate]')
input_from.click()
self.page.keyboard.press('Control+A')
self.page.keyboard.press('Delete')
self.page.keyboard.type(from_date.strftime(datefmt))
await input_from.click()
await self.page.keyboard.press('Control+A')
await self.page.keyboard.press('Delete')
await self.page.keyboard.type(from_date.strftime(datefmt))
input_to = modal.locator('input[data-qaid=toDate]')
input_to.click()
self.page.keyboard.press('Control+A')
self.page.keyboard.press('Delete')
self.page.keyboard.type(to_date.strftime(datefmt))
modal.get_by_role('button', name='Select Type').click()
self.page.get_by_text('Comma Separated').click()
with self.page.expect_download() as di:
self.page.get_by_role('button', name='Download').click()
await input_to.click()
await self.page.keyboard.press('Control+A')
await self.page.keyboard.press('Delete')
await self.page.keyboard.type(to_date.strftime(datefmt))
await modal.get_by_role('button', name='Select Type').click()
await self.page.get_by_text('Comma Separated').click()
async with self.page.expect_download() as di:
await self.page.get_by_role('button', name='Download').click()
log.debug('Waiting for download to complete')
path = di.value.path()
path = await (await di.value).path()
assert path
log.info('Downloaded transactions to %s', path)
modal.get_by_label('Close').click()
await modal.get_by_label('Close').click()
return path
def firefly_import(
async def firefly_import(
self, csv: Path, account: int, importer: FireflyImporter
) -> None:
config = copy.deepcopy(self.IMPORT_CONFIG)
config['default_account'] = account
importer.import_csv(csv, config)
await importer.import_csv(csv, config)
async def get_secret(self, key: str) -> str:
secret = await self.secrets.get_secret(key)
return secret.decode()
class Chase:
@ -526,134 +533,163 @@ class Chase:
'ignore_duplicate_transactions': True,
}
def __init__(self, page: Page) -> None:
def __init__(self, page: Page, secrets: SecretsClient) -> None:
self.page = page
self.username = 'AdmiralN3mo'
self.vault_item = 'Chase'
self.vault_folder = 'Websites'
self.secrets = secrets
self.saved_cookies = Path('cookies.json')
self._logged_in = False
def __enter__(self) -> 'Chase':
self.load_cookies()
async def __aenter__(self) -> 'Chase':
await self.load_cookies()
return self
def __exit__(
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> None:
try:
self.logout()
await self.logout()
finally:
self.save_cookies()
await self.save_cookies()
def load_cookies(self) -> None:
async def load_cookies(self) -> None:
log.debug('Loading saved cookies from %s', self.saved_cookies)
try:
with self.saved_cookies.open(encoding='utf-8') as f:
self.page.context.add_cookies(json.load(f))
except:
log.warning(
'Could not load saved cookies, '
'SMS verification will be required!'
)
cookies = await asyncio.to_thread(json.load, f)
await self.page.context.add_cookies(cookies)
except Exception as e:
log.debug('Failed to load saved cookies: %s', e)
else:
log.info('Successfully loaded saved cookies')
def save_cookies(self) -> None:
async def save_cookies(self) -> None:
log.debug('Saving cookies from %s', self.saved_cookies)
try:
with self.saved_cookies.open('w', encoding='utf-8') as f:
f.write(json.dumps(self.page.context.cookies()))
cookies = await self.page.context.cookies()
f.write(await asyncio.to_thread(json.dumps, cookies))
except Exception as e:
log.error('Failed to save cookies: %s', e)
else:
log.info('Successfully saved cookies to %s', self.saved_cookies)
def login(self) -> None:
async def login(self) -> None:
if self._logged_in:
return
log.debug('Navigating to %s', self.URL)
self.page.goto(self.URL)
self.page.wait_for_load_state()
self.page.wait_for_timeout(random.randint(2000, 4000))
password = rbw_get(self.vault_item, self.vault_folder, self.username)
await self.page.goto(self.URL)
await self.page.wait_for_load_state()
await self.page.wait_for_timeout(random.randint(2000, 4000))
username = await self.get_secret('bank.chase.username')
password = await self.get_secret('bank.chase.password')
log.debug('Filling username/password login form')
self.page.frame_locator('#logonbox').get_by_label('Username').fill(
self.username
)
self.page.frame_locator('#logonbox').get_by_label('Password').fill(
password
)
self.page.wait_for_timeout(random.randint(500, 750))
self.page.frame_locator('#logonbox').get_by_role(
'button', name='Sign in'
).click()
logonbox = self.page.frame_locator('#logonbox')
await logonbox.get_by_label('Username').fill(username)
await logonbox.get_by_label('Password').fill(password)
await self.page.wait_for_timeout(random.randint(500, 750))
await logonbox.get_by_role('button', name='Sign in').click()
log.debug('Waiting for page load')
self.page.wait_for_load_state()
self.page.get_by_role('button', name='Pay Card').wait_for(
timeout=120000
await self.page.wait_for_load_state()
logonframe = self.page.frame_locator('iframe[title="logon"]')
t_2fa = asyncio.create_task(
logonframe.get_by_role(
'heading', name="We don't recognize this device"
).wait_for()
)
t_finished = asyncio.create_task(
self.page.get_by_role('button', name='Pay Card').wait_for()
)
done, pending = await asyncio.wait(
(t_2fa, t_finished),
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
for t in done:
await t
if t_2fa in done:
log.warning('Device verification (SMS 2-factor auth) required')
await logonframe.get_by_label('Tell us how: Choose one').click()
await logonframe.locator(
'#container-1-simplerAuth-dropdownoptions-styledselect'
).click()
otp_task = asyncio.create_task(self.get_secret('bank.chase.otp'))
await logonframe.get_by_role('button', name='Next').click()
log.info('Waiting for SMS verification code')
otp = await otp_task
log.debug('Filling verification code form')
await logonframe.get_by_label('One-time code').fill(otp)
await logonframe.get_by_label('Password').fill(password)
await logonframe.get_by_role('button', name='Next').click()
await self.page.wait_for_load_state()
await self.page.get_by_role('button', name='Pay Card').wait_for()
log.info('Successfully logged in to Chase')
self._logged_in = True
def download_transactions(
async def download_transactions(
self, from_date: datetime.date, to_date: datetime.date
) -> Path:
log.info('Downloading transactions from %s to %s', from_date, to_date)
fmt = '%m/%d/%Y'
self.page.locator('#CARD_ACCOUNTS').get_by_role(
await self.page.locator('#CARD_ACCOUNTS').get_by_role(
'button', name='(...2467)'
).first.click()
fl = self.page.locator('#flyout')
fl.wait_for()
fl.get_by_role('button', name='Pay card', exact=True).wait_for()
fl.get_by_role(
await fl.wait_for()
await fl.get_by_role('button', name='Pay card', exact=True).wait_for()
await fl.get_by_role(
'button', name='Account activity', exact=True
).wait_for()
fl.get_by_role('link', name='Show details').wait_for()
fl.get_by_role('link', name='Activity since last statement').click()
fl.get_by_role('link', name='All transactions').click()
fl.get_by_text('See more activity').wait_for()
fl.get_by_role('button', name='Download Account Activity').click()
await fl.get_by_role('link', name='Show details').wait_for()
await fl.get_by_role(
'link', name='Activity since last statement'
).click()
await fl.get_by_role('link', name='All transactions').click()
await fl.get_by_text('See more activity').wait_for()
await fl.get_by_role(
'button', name='Download Account Activity'
).click()
log.debug('Filling account activity download form')
self.page.locator('#select-downloadActivityOptionId-label').click()
self.page.get_by_text('Choose a date range').nth(1).locator(
await self.page.locator(
'#select-downloadActivityOptionId-label'
).click()
await self.page.get_by_text('Choose a date range').nth(1).locator(
'../..'
).click()
self.page.wait_for_timeout(random.randint(500, 1500))
self.page.locator('#accountActivityFromDate-input-input').fill(
await self.page.wait_for_timeout(random.randint(500, 1500))
await self.page.locator('#accountActivityFromDate-input-input').fill(
from_date.strftime(fmt)
)
self.page.locator('#accountActivityFromDate-input-input').blur()
self.page.wait_for_timeout(random.randint(500, 1500))
self.page.locator('#accountActivityToDate-input-input').fill(
await self.page.locator('#accountActivityFromDate-input-input').blur()
await self.page.wait_for_timeout(random.randint(500, 1500))
await self.page.locator('#accountActivityToDate-input-input').fill(
to_date.strftime(fmt)
)
self.page.locator('#accountActivityToDate-input-input').blur()
self.page.wait_for_timeout(random.randint(500, 1500))
with self.page.expect_download(timeout=5000) as di:
self.page.get_by_role(
await self.page.locator('#accountActivityToDate-input-input').blur()
await self.page.wait_for_timeout(random.randint(500, 1500))
async with self.page.expect_download(timeout=5000) as di:
await self.page.get_by_role(
'button', name='Download', exact=True
).click()
log.debug('Waiting for download to complete')
self.page.wait_for_timeout(random.randint(1000, 2500))
path = di.value.path()
await self.page.wait_for_timeout(random.randint(1000, 2500))
path = await (await di.value).path()
assert path
log.info('Downloaded transactions to %s', path)
return path
def logout(self) -> None:
async def logout(self) -> None:
if not self._logged_in:
return
log.debug('Logging out of Chase')
with self.page.expect_event('load'):
self.page.get_by_role('button', name='Sign out').click()
async with self.page.expect_event('load'):
await self.page.get_by_role('button', name='Sign out').click()
log.info('Logged out of Chase')
def firefly_import(
async def firefly_import(
self, csv: Path, account: int, importer: FireflyImporter
) -> None:
config = copy.deepcopy(self.IMPORT_CONFIG)
@ -668,37 +704,66 @@ class Chase:
config['do_mapping'].pop(0)
else:
raise ValueError(f'Unexpected CSV schema: {headers}')
importer.import_csv(csv, config)
await importer.import_csv(csv, config)
async def get_secret(self, key: str) -> str:
secret = await self.secrets.get_secret(key)
return secret.decode()
def main() -> None:
logging.basicConfig(level=logging.DEBUG)
log.debug('Getting Firefly III access token from rbw vault')
token = rbw_get('xactfetch')
import_secret = secret_from_file(
'FIREFLY_IMPORT_SECRET_FILE', 'import.secret'
)
async def fetch_transactions(pw: Playwright, secrets: SecretsClient) -> bool:
log.debug('Getting Firefly III access token')
token = (await secrets.get_secret('firefly.token')).decode()
import_secret = (
await secrets.get_secret('firefly.import.secret')
).decode()
import_auth = (
os.environ.get('FIREFLY_IMPORT_USER', getpass.getuser()),
secret_from_file('FIREFLY_IMPORT_PASSWORD_FILE', 'import.password'),
(await secrets.get_secret('firefly.import.username')).decode(),
(await secrets.get_secret('firefly.import.password')).decode(),
)
importer = FireflyImporter(
FIREFLY_III_IMPORTER_URL, import_secret, import_auth
)
end_date = datetime.date.today() - datetime.timedelta(days=1)
with sync_playwright() as pw:
headless = os.environ.get('DEBUG_HEADLESS_BROWSER', '1') == '1'
browser = pw.firefox.launch(headless=headless)
page = browser.new_page()
failed = False
browser = await pw.chromium.launch(headless=False)
context = await browser.new_context()
await context.tracing.start(screenshots=True, snapshots=True)
page = await context.new_page()
banks = sys.argv[1:] or list(ACCOUNTS.keys())
if 'commerce' in banks:
if not download_commerce(page, end_date, token, importer):
if not await download_commerce(
page, secrets, end_date, token, importer
):
failed = True
if 'chase' in banks:
if not download_chase(page, end_date, token, importer):
if not await download_chase(page, secrets, end_date, token, importer):
failed = True
if failed:
await context.tracing.stop(path='trace.zip')
with open('trace.zip', 'rb') as f:
await ntfy(
'Downloading one or more transaction lists failed.',
attach=f.read(),
filename='trace.zip',
)
return failed
async def amain() -> None:
logging.basicConfig(level=logging.DEBUG)
async with SecretsClient() as secrets:
try:
async with async_playwright() as pw:
failed = await fetch_transactions(pw, secrets)
raise SystemExit(1 if failed else 0)
except asyncio.exceptions.InvalidStateError as e:
log.debug('Ignoring exception: %s', e, exc_info=sys.exc_info())
def main():
asyncio.run(amain())
if __name__ == '__main__':