1
0
Fork 0

Compare commits

..

No commits in common. "bdcb8c93b6a265b1abb24ac7f0e8e4d0fb0c68f3" and "72eae4d5b3fb69df75a66489914c7029cef2155e" have entirely different histories.

8 changed files with 265 additions and 596 deletions

View File

@ -1,8 +1,5 @@
* *
!.git !.git
!chase2fa.py
!entrypoint.sh
!pinentry-stub.sh !pinentry-stub.sh
!pyproject.toml !pyproject.toml
!secretsocket.py
!xactfetch.py !xactfetch.py

1
.gitignore vendored
View File

@ -1,7 +1,6 @@
/.venv /.venv
/cookies.json /cookies.json
/screenshot_*.png /screenshot_*.png
/secrets.toml
*.egg-info/ *.egg-info/
__pycache__/ __pycache__/
*.py[co] *.py[co]

View File

@ -1,4 +1,4 @@
FROM git.pyrocufflink.net/containerimages/dch-base AS build FROM registry.fedoraproject.org/fedora-minimal:38 AS build
RUN --mount=type=cache,target=/var/cache \ RUN --mount=type=cache,target=/var/cache \
microdnf install -y \ microdnf install -y \
@ -18,22 +18,11 @@ RUN --mount=type=cache,target=/var/cache \
python3-wheel \ python3-wheel \
&& : && :
COPY .git /src/.git COPY . /src
COPY xactfetch.py pyproject.toml /src
RUN python3 -m pip wheel -w /wheels /src RUN python3 -m pip wheel -w /wheels /src
FROM registry.fedoraproject.org/fedora-minimal:38
FROM scratch AS mixin
COPY pinentry-stub.sh /usr/local/bin/pinentry-stub
COPY secretsocket.py /usr/local/bin/secretsocket
COPY chase2fa.py /usr/local/bin/chase2fa
COPY entrypoint.sh /entrypoint.sh
FROM git.pyrocufflink.net/containerimages/dch-base
RUN --mount=type=cache,target=/var/cache \ RUN --mount=type=cache,target=/var/cache \
microdnf install -y \ microdnf install -y \
@ -58,16 +47,11 @@ RUN --mount=type=cache,target=/var/cache \
libXrandr \ libXrandr \
libXrender \ libXrender \
libXtst \ libXtst \
libdrm \
libxcb \ libxcb \
mesa-libgbm \
nspr \
nss \
pango \ pango \
python3 \ python3 \
python3-pip \ python3-pip \
tini \ tini \
xorg-x11-server-Xvfb \
&& echo xactfetch:x:2468: >> /etc/group \ && echo xactfetch:x:2468: >> /etc/group \
&& echo xactfetch:*:2468:2468:xactfetch:/var/lib/xactfetch:/sbin/nologin >> /etc/passwd \ && echo xactfetch:*:2468:2468:xactfetch:/var/lib/xactfetch:/sbin/nologin >> /etc/passwd \
&& : && :
@ -77,15 +61,16 @@ ENV PLAYWRIGHT_BROWSERS_PATH=/usr/local/playwright/browsers
RUN --mount=type=bind,from=build,source=/,target=/build \ RUN --mount=type=bind,from=build,source=/,target=/build \
python3 -m pip install --no-index -f /build/wheels xactfetch \ python3 -m pip install --no-index -f /build/wheels xactfetch \
&& cp /build/root/.cargo/bin/rbw* /usr/local/bin/ \ && cp /build/root/.cargo/bin/rbw* /usr/local/bin/ \
&& playwright install chromium \ && install /build/src/pinentry-stub.sh /usr/local/bin/pinentry-stub \
&& playwright install firefox \
&& : && :
COPY --from=mixin / /
VOLUME /var/lib/xactfetch VOLUME /var/lib/xactfetch
WORKDIR /var/lib/xactfetch WORKDIR /var/lib/xactfetch
USER 2468:2468 USER 2468:2468
ENTRYPOINT ["/entrypoint.sh"] ENV XDG_CONFIG_HOME=/etc
ENTRYPOINT ["tini", "xactfetch", "--"]

View File

@ -1,19 +0,0 @@
#!/usr/bin/env python3
import re
import httpx
stream = httpx.stream(
'GET',
'https://ntfy.pyrocufflink.blue/chase2fa/raw',
timeout=httpx.Timeout(5, read=None),
)
with stream as r:
for line in r.iter_lines():
line = line.strip()
if not line:
continue
m = re.search(r'\d{4,}', line)
if m:
print(m.group(0))
break

View File

@ -1,19 +0,0 @@
#!/bin/sh
if [ $$ -eq 1 ]; then
exec tini "$0" -- "$@"
fi
if [ -z "${SECRET_SOCKET_PATH}" ] || [ ! -e "${SECRET_SOCKET_PATH}" ]; then
export SECRET_SOCKET_PATH="${SECRET_SOCKET_PATH:-/tmp/.secretsocket}"
secretsocket &
sspid=$!
fi
xvfb-run -e /dev/stderr -s '-screen 0 1920x1080x24 -nolisten unix' xactfetch "$@"
rc=$?
if [ -n "${sspid}" ]; then
kill $sspid
fi
exit $rc

View File

@ -11,7 +11,6 @@ classifiers = [
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
] ]
dependencies = [ dependencies = [
"httpx~=0.27.0",
"playwright~=1.32", "playwright~=1.32",
"requests~=2.29.0", "requests~=2.29.0",
] ]
@ -29,7 +28,3 @@ build-backend = "setuptools.build_meta"
[tool.pyright] [tool.pyright]
venvPath = '.' venvPath = '.'
venv = '.venv' venv = '.venv'
[tool.black]
line-length = 79
skip-string-normalization = true

View File

@ -1,204 +0,0 @@
#!/usr/bin/env python3
import asyncio
import logging
import os
import shlex
import signal
import socket
import struct
import tomllib
from pathlib import Path
from typing import Optional
log = logging.getLogger('secretsocket')
ALLOW_UNKNOWN_PEER = os.environ.get('ALLOW_UNKNOWN_PEER') == '1'
SECRET_SOCKET_PATH = os.environ.get('SECRET_SOCKET_PATH')
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR')
class Secret:
async def lookup(self) -> Optional[bytes]:
raise NotImplementedError
class EnvSecret(Secret):
def __init__(self, env_var: str) -> None:
self.env_var = env_var
async def lookup(self) -> Optional[bytes]:
return os.environb.get(self.env_var.encode('utf-8'))
class ExecSecret(Secret):
def __init__(self, cmd: str) -> None:
self.cmd = cmd
async def lookup(self) -> Optional[bytes]:
args = shlex.split(self.cmd)
proc = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
)
o = await proc.communicate()
return o[0]
class PathSecret(Secret):
def __init__(self, path: Path) -> None:
self.path = path
async def lookup(self) -> Optional[bytes]:
try:
f = self.path.expanduser().open('rb')
except OSError as e:
log.error('Failed to read secret from %s: %s', self.path, e)
return None
with f:
return await asyncio.to_thread(f.read)
class StringSecret(Secret):
def __init__(self, value: str) -> None:
self.value = value
async def lookup(self) -> Optional[bytes]:
return self.value.encode('utf-8')
class SecretServer:
def __init__(self, path: Optional[Path] = None) -> None:
self.path = path
async def handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
try:
sock = writer.get_extra_info('socket')
pid, uid, gid = get_socket_peercred(sock)
except Exception as e:
log.error('Failed to get peer credentials: %s', e)
pid, uid, gid = None, None, None
log.debug('Client connected (pid %d, uid %d, gid %d)', pid, uid, gid)
if uid is None:
if ALLOW_UNKNOWN_PEER:
log.warning('Handling connection from unknown peer')
else:
log.error('Refusing to handle connection from unknown peer')
writer.close()
return
else:
my_uid = os.getresuid()[1]
if uid != my_uid:
log.error(
'Refusing to handle connection from PID %d: '
'peer UID %d does not match %d',
pid,
uid,
my_uid,
)
writer.close()
return
while 1:
try:
key = (await reader.readuntil(b'\n')).rstrip(b'\n').decode()
except asyncio.IncompleteReadError:
break
else:
log.info('Client %d requested secret %s', pid, key)
try:
secret = await self.get_secret(key)
except Exception:
log.exception('Failed to get secret:')
writer.close()
return
else:
writer.write(secret + b'\n')
await writer.drain()
log.debug('Client disconnected')
writer.close()
await writer.wait_closed()
async def get_secret(self, key: str) -> bytes:
secrets = await load_secrets(self.path)
if secret := secrets.get(key):
if value := await secret.lookup():
return value
else:
log.warning('Lookup of secret %s failed', key)
else:
log.warning('Unknown secret: %s', key)
return b''
def get_socket_peercred(sock: socket.socket) -> tuple[int, int, int]:
struct_ucred = '=iii'
buflen = struct.calcsize(struct_ucred)
cred = sock.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, buflen)
return struct.unpack(struct_ucred, cred)
async def load_secrets(path: Optional[Path] = None) -> dict[str, Secret]:
if path is None:
path = Path('secrets.toml')
secrets = {}
try:
f = path.open('rb')
except OSError as e:
log.error('Failed to load secrets: %s', e)
return secrets
with f:
config = await asyncio.to_thread(tomllib.load, f)
for key, value in config.items():
if 'env' in value:
secrets[key] = EnvSecret(value['env'])
elif 'exec' in value:
secrets[key] = ExecSecret(value['exec'])
elif 'path' in value:
secrets[key] = PathSecret(Path(value['path']))
elif 'string' in value:
secrets[key] = StringSecret(value['string'])
else:
log.warning(
'Unsupported configuration for secret %s: %r', key, value
)
return secrets
def shutdown(signum, server):
log.info('Received signal %d, shutting down', signum)
server.close()
async def main():
logging.basicConfig(level=logging.DEBUG)
if SECRET_SOCKET_PATH:
sock_path = Path(SECRET_SOCKET_PATH)
elif XDG_RUNTIME_DIR:
sock_path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss'
else:
sock_path = Path('/tmp/.secretsocket')
if not sock_path.parent.exists():
sock_path.parent.mkdir()
if sock_path.exists():
sock_path.unlink()
ss = SecretServer()
server = await asyncio.start_unix_server(ss.handle_client, path=sock_path)
async with server:
await server.start_serving()
loop = asyncio.get_running_loop()
for signum in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(signum, shutdown, signum, server)
await server.wait_closed()
if __name__ == '__main__':
asyncio.run(main())

View File

@ -1,35 +1,30 @@
import asyncio
import base64 import base64
import copy import copy
import datetime import datetime
import getpass
import json import json
import logging import logging
import os import os
import random import random
import sys import sys
import subprocess
import urllib.parse import urllib.parse
from pathlib import Path from pathlib import Path
from types import TracebackType from types import TracebackType
from typing import Any, Optional, Type from typing import Any, Optional, Type
import httpx import requests
from playwright.async_api import Playwright, Page from playwright.sync_api import Page
from playwright.async_api import async_playwright from playwright.sync_api import sync_playwright
log = logging.getLogger('xactfetch') log = logging.getLogger('xactfetch')
NTFY_URL = os.environ.get('NTFY_URL', 'https://ntfy.pyrocufflink.blue') NTFY_URL = os.environ['NTFY_URL']
NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'dustin') NTFY_TOPIC = os.environ['NTFY_TOPIC']
FIREFLY_III_URL = os.environ.get( FIREFLY_III_URL = os.environ['FIREFLY_III_URL']
'FIREFLY_III_URL', 'https://firefly.pyrocufflink.blue' FIREFLY_III_IMPORTER_URL = os.environ['FIREFLY_IMPORT_URL']
)
FIREFLY_III_IMPORTER_URL = os.environ.get(
'FIREFLY_IMPORT_URL', 'https://firefly-importer.pyrocufflink.blue'
)
SECRET_SOCKET_PATH = os.environ.get('SECRET_SOCKET_PATH')
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR')
ACCOUNTS = { ACCOUNTS = {
'commerce': { 'commerce': {
@ -50,9 +45,8 @@ class FireflyImporter:
self.url = url self.url = url
self.secret = secret self.secret = secret
self.auth = auth self.auth = auth
self.client = httpx.AsyncClient()
async def import_csv( def import_csv(
self, self,
csv: Path, csv: Path,
config: dict[str, Any], config: dict[str, Any],
@ -60,58 +54,24 @@ class FireflyImporter:
log.debug('Importing transactions from %s to Firefly III', csv) log.debug('Importing transactions from %s to Firefly III', csv)
url = f'{self.url.rstrip("/")}/autoupload' url = f'{self.url.rstrip("/")}/autoupload'
with csv.open('rb') as f: with csv.open('rb') as f:
async with httpx.AsyncClient(auth=self.auth) as client: r = requests.post(
r = await client.post(
url, url,
params={ auth=self.auth,
'secret': self.secret,
},
headers={ headers={
'Accept': 'application/json', 'Accept': 'application/json',
}, },
params={
'secret': self.secret,
},
files={ files={
'importable': ('import.csv', f), 'importable': ('import.csv', f),
'json': ('import.json', json.dumps(config)), 'json': ('import.json', json.dumps(config)),
}, },
timeout=300,
) )
r.raise_for_status() r.raise_for_status()
class SecretsClient: def ntfy(
def __init__(self) -> None:
self.sock: tuple[asyncio.StreamReader, asyncio.StreamWriter]
async def __aenter__(self) -> 'SecretsClient':
if not hasattr(self, 'sock'):
await self.connect()
return self
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> bool:
self.sock[1].close()
return False
async def connect(self) -> None:
if SECRET_SOCKET_PATH:
path = Path(SECRET_SOCKET_PATH)
elif XDG_RUNTIME_DIR:
path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss'
else:
path = '.secretsocket'
self.sock = await asyncio.open_unix_connection(str(path))
async def get_secret(self, key: str) -> bytes:
self.sock[1].write(f'{key}\n'.encode('utf-8'))
buf = await self.sock[0].read(64 * 2**10)
return buf.rstrip(b'\n')
async def ntfy(
message: Optional[str] = None, message: Optional[str] = None,
topic: str = NTFY_TOPIC, topic: str = NTFY_TOPIC,
title: Optional[str] = None, title: Optional[str] = None,
@ -126,7 +86,6 @@ async def ntfy(
if tags: if tags:
headers['Tags'] = tags headers['Tags'] = tags
url = f'{NTFY_URL}/{topic}' url = f'{NTFY_URL}/{topic}'
client = httpx.AsyncClient()
if attach: if attach:
if filename: if filename:
headers['Filename'] = filename headers['Filename'] = filename
@ -138,22 +97,71 @@ async def ntfy(
else: else:
message = message.replace('\n', '\\n') message = message.replace('\n', '\\n')
headers['Message'] = message headers['Message'] = message
async with client: r = requests.put(
r = await client.put(
url, url,
headers=headers, headers=headers,
content=attach, data=attach,
) )
else: else:
async with client: r = requests.post(
r = await client.post(
url, url,
headers=headers, headers=headers,
content=message, data=message,
) )
r.raise_for_status() r.raise_for_status()
def rbw_unlocked() -> bool:
log.debug('Checking if rbw vault is locked')
cmd = ['rbw', 'unlocked']
p = subprocess.run(cmd, check=False, stdout=subprocess.DEVNULL)
unlocked = p.returncode == 0
log.info('rbw vault is %s', 'unlocked' if unlocked else 'locked')
return unlocked
def rbw_get(
name: str, folder: Optional[str] = None, username: Optional[str] = None
) -> str:
log.info(
'Getting password for Bitwarden vault item '
'%s (folder: %s, username: %s)',
name,
folder,
username,
)
cmd = ['rbw', 'get']
if folder is not None:
cmd += ('--folder', folder)
cmd.append(name)
if username is not None:
cmd.append(username)
p = subprocess.run(cmd, check=True, capture_output=True, encoding='utf-8')
assert p.stdout is not None
return p.stdout.rstrip('\n')
def rbw_code(
name: str, folder: Optional[str] = None, username: Optional[str] = None
) -> str:
log.info(
'Getting OTP code for Bitwarden vault item '
'%s (folder: %s, username: %s)',
name,
folder,
username,
)
cmd = ['rbw', 'code']
if folder is not None:
cmd += ('--folder', folder)
cmd.append(name)
if username is not None:
cmd.append(username)
p = subprocess.run(cmd, check=True, capture_output=True, encoding='utf-8')
assert p.stdout is not None
return p.stdout.rstrip('\n')
def rfc2047_base64encode( def rfc2047_base64encode(
message: str, message: str,
) -> str: ) -> str:
@ -161,16 +169,21 @@ def rfc2047_base64encode(
return f"=?UTF-8?B?{encoded}?=" return f"=?UTF-8?B?{encoded}?="
async def get_last_transaction_date(key: int, token: str) -> datetime.date: def secret_from_file(env: str, default: str) -> str:
filename = os.environ.get(env, default)
log.debug('Loading secret value from %s', filename)
with open(filename, 'r', encoding='utf-8') as f:
return f.read().rstrip()
def get_last_transaction_date(key: int, token: str) -> datetime.date:
url = f'{FIREFLY_III_URL}/api/v1/accounts/{key}/transactions' url = f'{FIREFLY_III_URL}/api/v1/accounts/{key}/transactions'
async with httpx.AsyncClient() as client: r = requests.get(
r = await client.get(
url, url,
headers={ headers={
'Authorization': f'Bearer {token}', 'Authorization': f'Bearer {token}',
'Accept': 'application/vnd.api+json', 'Accept': 'application/vnd.api+json',
}, },
timeout=10,
) )
r.raise_for_status() r.raise_for_status()
last_date = datetime.datetime.min last_date = datetime.datetime.min
@ -191,17 +204,13 @@ async def get_last_transaction_date(key: int, token: str) -> datetime.date:
return last_date.date() return last_date.date()
async def download_chase( def download_chase(
page: Page, page: Page, end_date: datetime.date, token: str, importer: FireflyImporter
secrets: SecretsClient,
end_date: datetime.date,
token: str,
importer: FireflyImporter,
) -> bool: ) -> bool:
async with Chase(page, secrets) as c, ntfyerror('Chase', page) as r: with Chase(page) as c, ntfyerror('Chase', page) as r:
key = ACCOUNTS['chase'] key = ACCOUNTS['chase']
try: try:
start_date = await get_last_transaction_date(key, token) start_date = get_last_transaction_date(key, token)
except (OSError, ValueError) as e: except (OSError, ValueError) as e:
log.error( log.error(
'Skipping Chase account: could not get last transaction: %s', 'Skipping Chase account: could not get last transaction: %s',
@ -214,28 +223,25 @@ async def download_chase(
start_date, start_date,
) )
return True return True
await c.login() c.login()
csv = await c.download_transactions(start_date, end_date) csv = c.download_transactions(start_date, end_date)
log.info('Importing transactions from Chase into Firefly III') log.info('Importing transactions from Chase into Firefly III')
await c.firefly_import(csv, key, importer) c.firefly_import(csv, key, importer)
return r.success return r.success
async def download_commerce( def download_commerce(
page: Page, page: Page,
secrets: SecretsClient,
end_date: datetime.date, end_date: datetime.date,
token: str, token: str,
importer: FireflyImporter, importer: FireflyImporter,
) -> bool: ) -> bool:
log.info('Downloading transaction lists from Commerce Bank') log.info('Downloading transaction lists from Commerce Bank')
csvs = [] csvs = []
async with CommerceBank(page, secrets) as c, ntfyerror( with CommerceBank(page) as c, ntfyerror('Commerce Bank', page) as r:
'Commerce Bank', page
) as r:
for name, key in ACCOUNTS['commerce'].items(): for name, key in ACCOUNTS['commerce'].items():
try: try:
start_date = await get_last_transaction_date(key, token) start_date = get_last_transaction_date(key, token)
except (OSError, ValueError) as e: except (OSError, ValueError) as e:
log.error( log.error(
'Skipping account %s: could not get last transaction: %s', 'Skipping account %s: could not get last transaction: %s',
@ -255,14 +261,12 @@ async def download_commerce(
start_date, start_date,
name, name,
) )
await c.login() c.login()
await c.open_account(name) c.open_account(name)
csvs.append( csvs.append((key, c.download_transactions(start_date, end_date)))
(key, await c.download_transactions(start_date, end_date))
)
log.info('Importing transactions from Commerce Bank into Firefly III') log.info('Importing transactions from Commerce Bank into Firefly III')
for key, csv in csvs: for key, csv in csvs:
await c.firefly_import(csv, key, importer) c.firefly_import(csv, key, importer)
return r.success return r.success
@ -272,10 +276,10 @@ class ntfyerror:
self.page = page self.page = page
self.success = True self.success = True
async def __aenter__(self) -> 'ntfyerror': def __enter__(self) -> 'ntfyerror':
return self return self
async def __aexit__( def __exit__(
self, self,
exc_type: Optional[Type[Exception]], exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception], exc_value: Optional[Exception],
@ -288,13 +292,9 @@ class ntfyerror:
) )
if os.environ.get('DEBUG_NTFY', '1') == '0': if os.environ.get('DEBUG_NTFY', '1') == '0':
return True return True
try: if ss := self.page.screenshot():
if ss := await self.page.screenshot(): save_screenshot(ss)
await asyncio.to_thread(save_screenshot, ss) ntfy(
except Exception:
log.exception('Failed to get screenshot:')
ss = None
await ntfy(
message=str(exc_value), message=str(exc_value),
title=f'xactfetch failed for {self.bank}', title=f'xactfetch failed for {self.bank}',
tags='warning', tags='warning',
@ -367,114 +367,107 @@ class CommerceBank:
'ignore_duplicate_transactions': True, 'ignore_duplicate_transactions': True,
} }
def __init__(self, page: Page, secrets: SecretsClient) -> None: def __init__(self, page: Page) -> None:
self.page = page self.page = page
self.secrets = secrets self.username = 'admiraln3mo'
self.vault_item = 'Commerce Bank'
self.vault_folder = 'Websites'
self._logged_in = False self._logged_in = False
async def __aenter__(self) -> 'CommerceBank': def __enter__(self) -> 'CommerceBank':
return self return self
async def __aexit__( def __exit__(
self, self,
exc_type: Optional[Type[Exception]], exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception], exc_value: Optional[Exception],
tb: Optional[TracebackType], tb: Optional[TracebackType],
) -> None: ) -> None:
await self.logout() self.logout()
async def login(self) -> None: def login(self) -> None:
if self._logged_in: if self._logged_in:
return return
log.debug('Navigating to %s', self.URL) log.debug('Navigating to %s', self.URL)
await self.page.goto(self.URL) self.page.goto(self.URL)
username = await self.get_secret('bank.commerce.username') password = rbw_get(self.vault_item, self.vault_folder, self.username)
password = await self.get_secret('bank.commerce.password')
log.debug('Filling username/password login form') log.debug('Filling username/password login form')
await self.page.get_by_role('textbox', name='Customer ID').fill( self.page.get_by_role('textbox', name='Customer ID').fill(
username self.username
) )
await self.page.get_by_role('textbox', name='Password').fill(password) self.page.get_by_role('textbox', name='Password').fill(password)
await self.page.get_by_role('button', name='Log In').click() self.page.get_by_role('button', name='Log In').click()
log.debug('Waiting for OTP 2FA form') log.debug('Waiting for OTP 2FA form')
otp_input = self.page.locator('id=securityCodeInput') otp_input = self.page.locator('id=securityCodeInput')
await otp_input.wait_for() otp_input.wait_for()
await self.page.wait_for_timeout(random.randint(1000, 3000)) self.page.wait_for_timeout(random.randint(1000, 3000))
log.debug('Filling OTP 2FA form') log.debug('Filling OTP 2FA form')
otp = await self.get_secret('bank.commerce.otp') otp = rbw_code(self.vault_item, self.vault_folder, self.username)
await otp_input.fill(otp) otp_input.fill(otp)
async with self.page.expect_event('load'): with self.page.expect_event('load'):
await self.page.get_by_role('button', name='Continue').click() self.page.get_by_role('button', name='Continue').click()
log.debug('Waiting for page load') log.debug('Waiting for page load')
await self.page.wait_for_load_state() self.page.wait_for_load_state()
cur_url = urllib.parse.urlparse(self.page.url) cur_url = urllib.parse.urlparse(self.page.url)
if cur_url.path != '/CBI/Accounts/Summary': if cur_url.path != '/CBI/Accounts/Summary':
new_url = cur_url._replace(path='/CBI/Accounts/Summary', query='') new_url = cur_url._replace(path='/CBI/Accounts/Summary', query='')
await self.page.goto(urllib.parse.urlunparse(new_url)) self.page.goto(urllib.parse.urlunparse(new_url))
log.info('Successfully logged in to Commerce Bank') log.info('Successfully logged in to Commerce Bank')
self._logged_in = True self._logged_in = True
async def logout(self) -> None: def logout(self) -> None:
if not self._logged_in: if not self._logged_in:
return return
log.debug('Logging out of Commerce Bank') log.debug('Logging out of Commerce Bank')
async with self.page.expect_event('load'): with self.page.expect_event('load'):
await self.page.get_by_test_id('navWrap').get_by_text( self.page.get_by_test_id('navWrap').get_by_text('Logout').click()
'Logout'
).click()
log.info('Logged out of Commerce Bank') log.info('Logged out of Commerce Bank')
async def open_account(self, account: str) -> None: def open_account(self, account: str) -> None:
log.debug('Navigating to activity page for account %s', account) log.debug('Navigating to activity page for account %s', account)
if '/Activity/' in self.page.url: if '/Activity/' in self.page.url:
await self.page.get_by_role('button', name='My Accounts').click() self.page.get_by_role('button', name='My Accounts').click()
async with self.page.expect_event('load'): with self.page.expect_event('load'):
await self.page.get_by_role('link', name=account).click() self.page.get_by_role('link', name=account).click()
await self.page.wait_for_load_state() self.page.wait_for_load_state()
await self.page.wait_for_timeout(random.randint(1000, 3000)) self.page.wait_for_timeout(random.randint(1000, 3000))
log.info('Loaded activity page for account %s', account) log.info('Loaded activity page for account %s', account)
async def download_transactions( def download_transactions(
self, from_date: datetime.date, to_date: datetime.date self, from_date: datetime.date, to_date: datetime.date
) -> Path: ) -> Path:
log.info('Downloading transactions from %s to %s', from_date, to_date) log.info('Downloading transactions from %s to %s', from_date, to_date)
datefmt = '%m/%d/%Y' datefmt = '%m/%d/%Y'
await self.page.get_by_role( self.page.get_by_role('link', name='Download Transactions').click()
'link', name='Download Transactions' self.page.wait_for_timeout(random.randint(750, 1250))
).click()
await self.page.wait_for_timeout(random.randint(750, 1250))
modal = self.page.locator('#download-transactions') modal = self.page.locator('#download-transactions')
input_from = modal.locator('input[data-qaid=fromDate]') input_from = modal.locator('input[data-qaid=fromDate]')
await input_from.click() input_from.click()
await self.page.keyboard.press('Control+A') self.page.keyboard.press('Control+A')
await self.page.keyboard.press('Delete') self.page.keyboard.press('Delete')
await self.page.keyboard.type(from_date.strftime(datefmt)) self.page.keyboard.type(from_date.strftime(datefmt))
input_to = modal.locator('input[data-qaid=toDate]') input_to = modal.locator('input[data-qaid=toDate]')
await input_to.click() input_to.click()
await self.page.keyboard.press('Control+A') self.page.keyboard.press('Control+A')
await self.page.keyboard.press('Delete') self.page.keyboard.press('Delete')
await self.page.keyboard.type(to_date.strftime(datefmt)) self.page.keyboard.type(to_date.strftime(datefmt))
await modal.get_by_role('button', name='Select Type').click() modal.get_by_role('button', name='Select Type').click()
await self.page.get_by_text('Comma Separated').click() self.page.get_by_text('Comma Separated').click()
async with self.page.expect_download() as di: with self.page.expect_download() as di:
await self.page.get_by_role('button', name='Download').click() self.page.get_by_role('button', name='Download').click()
log.debug('Waiting for download to complete') log.debug('Waiting for download to complete')
path = await (await di.value).path() path = di.value.path()
assert path assert path
log.info('Downloaded transactions to %s', path) log.info('Downloaded transactions to %s', path)
await modal.get_by_label('Close').click() modal.get_by_label('Close').click()
return path return path
async def firefly_import( def firefly_import(
self, csv: Path, account: int, importer: FireflyImporter self, csv: Path, account: int, importer: FireflyImporter
) -> None: ) -> None:
config = copy.deepcopy(self.IMPORT_CONFIG) config = copy.deepcopy(self.IMPORT_CONFIG)
config['default_account'] = account config['default_account'] = account
await importer.import_csv(csv, config) importer.import_csv(csv, config)
async def get_secret(self, key: str) -> str:
secret = await self.secrets.get_secret(key)
return secret.decode()
class Chase: class Chase:
@ -533,163 +526,134 @@ class Chase:
'ignore_duplicate_transactions': True, 'ignore_duplicate_transactions': True,
} }
def __init__(self, page: Page, secrets: SecretsClient) -> None: def __init__(self, page: Page) -> None:
self.page = page self.page = page
self.secrets = secrets self.username = 'AdmiralN3mo'
self.vault_item = 'Chase'
self.vault_folder = 'Websites'
self.saved_cookies = Path('cookies.json') self.saved_cookies = Path('cookies.json')
self._logged_in = False self._logged_in = False
async def __aenter__(self) -> 'Chase': def __enter__(self) -> 'Chase':
await self.load_cookies() self.load_cookies()
return self return self
async def __aexit__( def __exit__(
self, self,
exc_type: Optional[Type[Exception]], exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception], exc_value: Optional[Exception],
tb: Optional[TracebackType], tb: Optional[TracebackType],
) -> None: ) -> None:
try: try:
await self.logout() self.logout()
finally: finally:
await self.save_cookies() self.save_cookies()
async def load_cookies(self) -> None: def load_cookies(self) -> None:
log.debug('Loading saved cookies from %s', self.saved_cookies) log.debug('Loading saved cookies from %s', self.saved_cookies)
try: try:
with self.saved_cookies.open(encoding='utf-8') as f: with self.saved_cookies.open(encoding='utf-8') as f:
cookies = await asyncio.to_thread(json.load, f) self.page.context.add_cookies(json.load(f))
await self.page.context.add_cookies(cookies) except:
except Exception as e: log.warning(
log.debug('Failed to load saved cookies: %s', e) 'Could not load saved cookies, '
'SMS verification will be required!'
)
else: else:
log.info('Successfully loaded saved cookies') log.info('Successfully loaded saved cookies')
async def save_cookies(self) -> None: def save_cookies(self) -> None:
log.debug('Saving cookies from %s', self.saved_cookies) log.debug('Saving cookies from %s', self.saved_cookies)
try: try:
with self.saved_cookies.open('w', encoding='utf-8') as f: with self.saved_cookies.open('w', encoding='utf-8') as f:
cookies = await self.page.context.cookies() f.write(json.dumps(self.page.context.cookies()))
f.write(await asyncio.to_thread(json.dumps, cookies))
except Exception as e: except Exception as e:
log.error('Failed to save cookies: %s', e) log.error('Failed to save cookies: %s', e)
else: else:
log.info('Successfully saved cookies to %s', self.saved_cookies) log.info('Successfully saved cookies to %s', self.saved_cookies)
async def login(self) -> None: def login(self) -> None:
if self._logged_in: if self._logged_in:
return return
log.debug('Navigating to %s', self.URL) log.debug('Navigating to %s', self.URL)
await self.page.goto(self.URL) self.page.goto(self.URL)
await self.page.wait_for_load_state() self.page.wait_for_load_state()
await self.page.wait_for_timeout(random.randint(2000, 4000)) self.page.wait_for_timeout(random.randint(2000, 4000))
username = await self.get_secret('bank.chase.username') password = rbw_get(self.vault_item, self.vault_folder, self.username)
password = await self.get_secret('bank.chase.password')
log.debug('Filling username/password login form') log.debug('Filling username/password login form')
logonbox = self.page.frame_locator('#logonbox') self.page.frame_locator('#logonbox').get_by_label('Username').fill(
await logonbox.get_by_label('Username').fill(username) self.username
await logonbox.get_by_label('Password').fill(password)
await self.page.wait_for_timeout(random.randint(500, 750))
await logonbox.get_by_role('button', name='Sign in').click()
log.debug('Waiting for page load')
await self.page.wait_for_load_state()
logonframe = self.page.frame_locator('iframe[title="logon"]')
t_2fa = asyncio.create_task(
logonframe.get_by_role(
'heading', name="We don't recognize this device"
).wait_for()
) )
t_finished = asyncio.create_task( self.page.frame_locator('#logonbox').get_by_label('Password').fill(
self.page.get_by_role('button', name='Pay Card').wait_for() password
) )
done, pending = await asyncio.wait( self.page.wait_for_timeout(random.randint(500, 750))
(t_2fa, t_finished), self.page.frame_locator('#logonbox').get_by_role(
return_when=asyncio.FIRST_COMPLETED, 'button', name='Sign in'
)
for t in pending:
t.cancel()
for t in done:
await t
if t_2fa in done:
log.warning('Device verification (SMS 2-factor auth) required')
await logonframe.get_by_label('Tell us how: Choose one').click()
await logonframe.locator(
'#container-1-simplerAuth-dropdownoptions-styledselect'
).click() ).click()
otp_task = asyncio.create_task(self.get_secret('bank.chase.otp')) log.debug('Waiting for page load')
await logonframe.get_by_role('button', name='Next').click() self.page.wait_for_load_state()
log.info('Waiting for SMS verification code') self.page.get_by_role('button', name='Pay Card').wait_for(
otp = await otp_task timeout=120000
log.debug('Filling verification code form') )
await logonframe.get_by_label('One-time code').fill(otp)
await logonframe.get_by_label('Password').fill(password)
await logonframe.get_by_role('button', name='Next').click()
await self.page.wait_for_load_state()
await self.page.get_by_role('button', name='Pay Card').wait_for()
log.info('Successfully logged in to Chase') log.info('Successfully logged in to Chase')
self._logged_in = True self._logged_in = True
async def download_transactions( def download_transactions(
self, from_date: datetime.date, to_date: datetime.date self, from_date: datetime.date, to_date: datetime.date
) -> Path: ) -> Path:
log.info('Downloading transactions from %s to %s', from_date, to_date) log.info('Downloading transactions from %s to %s', from_date, to_date)
fmt = '%m/%d/%Y' fmt = '%m/%d/%Y'
await self.page.locator('#CARD_ACCOUNTS').get_by_role( self.page.locator('#CARD_ACCOUNTS').get_by_role(
'button', name='(...2467)' 'button', name='(...2467)'
).first.click() ).first.click()
fl = self.page.locator('#flyout') fl = self.page.locator('#flyout')
await fl.wait_for() fl.wait_for()
await fl.get_by_role('button', name='Pay card', exact=True).wait_for() fl.get_by_role('button', name='Pay card', exact=True).wait_for()
await fl.get_by_role( fl.get_by_role(
'button', name='Account activity', exact=True 'button', name='Account activity', exact=True
).wait_for() ).wait_for()
await fl.get_by_role('link', name='Show details').wait_for() fl.get_by_role('link', name='Show details').wait_for()
await fl.get_by_role( fl.get_by_role('link', name='Activity since last statement').click()
'link', name='Activity since last statement' fl.get_by_role('link', name='All transactions').click()
).click() fl.get_by_text('See more activity').wait_for()
await fl.get_by_role('link', name='All transactions').click() fl.get_by_role('button', name='Download Account Activity').click()
await fl.get_by_text('See more activity').wait_for()
await fl.get_by_role(
'button', name='Download Account Activity'
).click()
log.debug('Filling account activity download form') log.debug('Filling account activity download form')
await self.page.locator( self.page.locator('#select-downloadActivityOptionId-label').click()
'#select-downloadActivityOptionId-label' self.page.get_by_text('Choose a date range').nth(1).locator(
).click()
await self.page.get_by_text('Choose a date range').nth(1).locator(
'../..' '../..'
).click() ).click()
await self.page.wait_for_timeout(random.randint(500, 1500)) self.page.wait_for_timeout(random.randint(500, 1500))
await self.page.locator('#accountActivityFromDate-input-input').fill( self.page.locator('#accountActivityFromDate-input-input').fill(
from_date.strftime(fmt) from_date.strftime(fmt)
) )
await self.page.locator('#accountActivityFromDate-input-input').blur() self.page.locator('#accountActivityFromDate-input-input').blur()
await self.page.wait_for_timeout(random.randint(500, 1500)) self.page.wait_for_timeout(random.randint(500, 1500))
await self.page.locator('#accountActivityToDate-input-input').fill( self.page.locator('#accountActivityToDate-input-input').fill(
to_date.strftime(fmt) to_date.strftime(fmt)
) )
await self.page.locator('#accountActivityToDate-input-input').blur() self.page.locator('#accountActivityToDate-input-input').blur()
await self.page.wait_for_timeout(random.randint(500, 1500)) self.page.wait_for_timeout(random.randint(500, 1500))
async with self.page.expect_download(timeout=5000) as di: with self.page.expect_download(timeout=5000) as di:
await self.page.get_by_role( self.page.get_by_role(
'button', name='Download', exact=True 'button', name='Download', exact=True
).click() ).click()
log.debug('Waiting for download to complete') log.debug('Waiting for download to complete')
await self.page.wait_for_timeout(random.randint(1000, 2500)) self.page.wait_for_timeout(random.randint(1000, 2500))
path = await (await di.value).path() path = di.value.path()
assert path assert path
log.info('Downloaded transactions to %s', path) log.info('Downloaded transactions to %s', path)
return path return path
async def logout(self) -> None: def logout(self) -> None:
if not self._logged_in: if not self._logged_in:
return return
log.debug('Logging out of Chase') log.debug('Logging out of Chase')
async with self.page.expect_event('load'): with self.page.expect_event('load'):
await self.page.get_by_role('button', name='Sign out').click() self.page.get_by_role('button', name='Sign out').click()
log.info('Logged out of Chase') log.info('Logged out of Chase')
async def firefly_import( def firefly_import(
self, csv: Path, account: int, importer: FireflyImporter self, csv: Path, account: int, importer: FireflyImporter
) -> None: ) -> None:
config = copy.deepcopy(self.IMPORT_CONFIG) config = copy.deepcopy(self.IMPORT_CONFIG)
@ -704,66 +668,37 @@ class Chase:
config['do_mapping'].pop(0) config['do_mapping'].pop(0)
else: else:
raise ValueError(f'Unexpected CSV schema: {headers}') raise ValueError(f'Unexpected CSV schema: {headers}')
await importer.import_csv(csv, config) importer.import_csv(csv, config)
async def get_secret(self, key: str) -> str:
secret = await self.secrets.get_secret(key)
return secret.decode()
async def fetch_transactions(pw: Playwright, secrets: SecretsClient) -> bool: def main() -> None:
log.debug('Getting Firefly III access token') logging.basicConfig(level=logging.DEBUG)
token = (await secrets.get_secret('firefly.token')).decode() log.debug('Getting Firefly III access token from rbw vault')
import_secret = ( token = rbw_get('xactfetch')
await secrets.get_secret('firefly.import.secret') import_secret = secret_from_file(
).decode() 'FIREFLY_IMPORT_SECRET_FILE', 'import.secret'
)
import_auth = ( import_auth = (
(await secrets.get_secret('firefly.import.username')).decode(), os.environ.get('FIREFLY_IMPORT_USER', getpass.getuser()),
(await secrets.get_secret('firefly.import.password')).decode(), secret_from_file('FIREFLY_IMPORT_PASSWORD_FILE', 'import.password'),
) )
importer = FireflyImporter( importer = FireflyImporter(
FIREFLY_III_IMPORTER_URL, import_secret, import_auth FIREFLY_III_IMPORTER_URL, import_secret, import_auth
) )
end_date = datetime.date.today() - datetime.timedelta(days=1) end_date = datetime.date.today() - datetime.timedelta(days=1)
with sync_playwright() as pw:
headless = os.environ.get('DEBUG_HEADLESS_BROWSER', '1') == '1'
browser = pw.firefox.launch(headless=headless)
page = browser.new_page()
failed = False failed = False
browser = await pw.chromium.launch(headless=False)
context = await browser.new_context()
await context.tracing.start(screenshots=True, snapshots=True)
page = await context.new_page()
banks = sys.argv[1:] or list(ACCOUNTS.keys()) banks = sys.argv[1:] or list(ACCOUNTS.keys())
if 'commerce' in banks: if 'commerce' in banks:
if not await download_commerce( if not download_commerce(page, end_date, token, importer):
page, secrets, end_date, token, importer
):
failed = True failed = True
if 'chase' in banks: if 'chase' in banks:
if not await download_chase(page, secrets, end_date, token, importer): if not download_chase(page, end_date, token, importer):
failed = True failed = True
if failed:
await context.tracing.stop(path='trace.zip')
with open('trace.zip', 'rb') as f:
await ntfy(
'Downloading one or more transaction lists failed.',
attach=f.read(),
filename='trace.zip',
)
return failed
async def amain() -> None:
logging.basicConfig(level=logging.DEBUG)
async with SecretsClient() as secrets:
try:
async with async_playwright() as pw:
failed = await fetch_transactions(pw, secrets)
raise SystemExit(1 if failed else 0) raise SystemExit(1 if failed else 0)
except asyncio.exceptions.InvalidStateError as e:
log.debug('Ignoring exception: %s', e, exc_info=sys.exc_info())
def main():
asyncio.run(amain())
if __name__ == '__main__': if __name__ == '__main__':