diff --git a/pyproject.toml b/pyproject.toml index 0edcdd8..87d020f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ classifiers = [ "Programming Language :: Python :: 3", ] dependencies = [ + "httpx~=0.27.0", "playwright~=1.32", "requests~=2.29.0", ] @@ -28,3 +29,7 @@ build-backend = "setuptools.build_meta" [tool.pyright] venvPath = '.' venv = '.venv' + +[tool.black] +line-length = 79 +skip-string-normalization = true diff --git a/xactfetch.py b/xactfetch.py index 203c91f..d3a0a04 100644 --- a/xactfetch.py +++ b/xactfetch.py @@ -1,3 +1,4 @@ +import asyncio import base64 import copy import datetime @@ -5,16 +6,15 @@ import json import logging import os import random -import socket import sys import urllib.parse from pathlib import Path from types import TracebackType from typing import Any, Optional, Type -import requests -from playwright.sync_api import Page -from playwright.sync_api import sync_playwright +import httpx +from playwright.async_api import Page +from playwright.async_api import async_playwright log = logging.getLogger('xactfetch') @@ -46,8 +46,9 @@ class FireflyImporter: self.url = url self.secret = secret self.auth = auth + self.client = httpx.AsyncClient() - def import_csv( + async def import_csv( self, csv: Path, config: dict[str, Any], @@ -55,58 +56,58 @@ class FireflyImporter: log.debug('Importing transactions from %s to Firefly III', csv) url = f'{self.url.rstrip("/")}/autoupload' with csv.open('rb') as f: - r = requests.post( - url, - auth=self.auth, - headers={ - 'Accept': 'application/json', - }, - params={ - 'secret': self.secret, - }, - files={ - 'importable': ('import.csv', f), - 'json': ('import.json', json.dumps(config)), - }, - ) - r.raise_for_status() + async with httpx.AsyncClient(auth=self.auth) as client: + r = await client.post( + url, + params={ + 'secret': self.secret, + }, + headers={ + 'Accept': 'application/json', + }, + files={ + 'importable': ('import.csv', f), + 'json': ('import.json', json.dumps(config)), + }, + timeout=300, + ) + r.raise_for_status() class SecretsClient: def __init__(self) -> None: - self.sock: socket.socket + self.sock: tuple[asyncio.StreamReader, asyncio.StreamWriter] - def __enter__(self) -> 'SecretsClient': + async def __aenter__(self) -> 'SecretsClient': if not hasattr(self, 'sock'): - self.connect() + await self.connect() return self - def __exit__( + async def __aexit__( self, exc_type: Optional[Type[Exception]], exc_value: Optional[Exception], tb: Optional[TracebackType], ) -> bool: - self.sock.close() + self.sock[1].close() return False - def connect(self) -> None: + async def connect(self) -> None: if SECRET_SOCKET_PATH: path = Path(SECRET_SOCKET_PATH) elif XDG_RUNTIME_DIR: path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss' else: path = '.secretsocket' - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.sock.connect(str(path)) + self.sock = await asyncio.open_unix_connection(str(path)) - def get_secret(self, key: str) -> bytes: - self.sock.send(f'{key}\n'.encode('utf-8')) - buf = self.sock.recv(64 * 2**10) + async def get_secret(self, key: str) -> bytes: + self.sock[1].write(f'{key}\n'.encode('utf-8')) + buf = await self.sock[0].read(64 * 2**10) return buf.rstrip(b'\n') -def ntfy( +async def ntfy( message: Optional[str] = None, topic: str = NTFY_TOPIC, title: Optional[str] = None, @@ -121,6 +122,7 @@ def ntfy( if tags: headers['Tags'] = tags url = f'{NTFY_URL}/{topic}' + client = httpx.AsyncClient() if attach: if filename: headers['Filename'] = filename @@ -132,17 +134,19 @@ def ntfy( else: message = message.replace('\n', '\\n') headers['Message'] = message - r = requests.put( - url, - headers=headers, - data=attach, - ) + async with client: + r = await client.put( + url, + headers=headers, + content=attach, + ) else: - r = requests.post( - url, - headers=headers, - data=message, - ) + async with client: + r = await client.post( + url, + headers=headers, + content=message, + ) r.raise_for_status() @@ -153,15 +157,16 @@ def rfc2047_base64encode( return f"=?UTF-8?B?{encoded}?=" -def get_last_transaction_date(key: int, token: str) -> datetime.date: +async def get_last_transaction_date(key: int, token: str) -> datetime.date: url = f'{FIREFLY_III_URL}/api/v1/accounts/{key}/transactions' - r = requests.get( - url, - headers={ - 'Authorization': f'Bearer {token}', - 'Accept': 'application/vnd.api+json', - }, - ) + async with httpx.AsyncClient() as client: + r = await client.get( + url, + headers={ + 'Authorization': f'Bearer {token}', + 'Accept': 'application/vnd.api+json', + }, + ) r.raise_for_status() last_date = datetime.datetime.min for xact in r.json()['data']: @@ -181,17 +186,17 @@ def get_last_transaction_date(key: int, token: str) -> datetime.date: return last_date.date() -def download_chase( +async def download_chase( page: Page, secrets: SecretsClient, end_date: datetime.date, token: str, importer: FireflyImporter, ) -> bool: - with Chase(page, secrets) as c, ntfyerror('Chase', page) as r: + async with Chase(page, secrets) as c, ntfyerror('Chase', page) as r: key = ACCOUNTS['chase'] try: - start_date = get_last_transaction_date(key, token) + start_date = await get_last_transaction_date(key, token) except (OSError, ValueError) as e: log.error( 'Skipping Chase account: could not get last transaction: %s', @@ -204,14 +209,14 @@ def download_chase( start_date, ) return True - c.login() - csv = c.download_transactions(start_date, end_date) + await c.login() + csv = await c.download_transactions(start_date, end_date) log.info('Importing transactions from Chase into Firefly III') - c.firefly_import(csv, key, importer) + await c.firefly_import(csv, key, importer) return r.success -def download_commerce( +async def download_commerce( page: Page, secrets: SecretsClient, end_date: datetime.date, @@ -220,12 +225,12 @@ def download_commerce( ) -> bool: log.info('Downloading transaction lists from Commerce Bank') csvs = [] - with CommerceBank(page, secrets) as c, ntfyerror( + async with CommerceBank(page, secrets) as c, ntfyerror( 'Commerce Bank', page ) as r: for name, key in ACCOUNTS['commerce'].items(): try: - start_date = get_last_transaction_date(key, token) + start_date = await get_last_transaction_date(key, token) except (OSError, ValueError) as e: log.error( 'Skipping account %s: could not get last transaction: %s', @@ -245,12 +250,14 @@ def download_commerce( start_date, name, ) - c.login() - c.open_account(name) - csvs.append((key, c.download_transactions(start_date, end_date))) + await c.login() + await c.open_account(name) + csvs.append( + (key, await c.download_transactions(start_date, end_date)) + ) log.info('Importing transactions from Commerce Bank into Firefly III') for key, csv in csvs: - c.firefly_import(csv, key, importer) + await c.firefly_import(csv, key, importer) return r.success @@ -260,10 +267,10 @@ class ntfyerror: self.page = page self.success = True - def __enter__(self) -> 'ntfyerror': + async def __aenter__(self) -> 'ntfyerror': return self - def __exit__( + async def __aexit__( self, exc_type: Optional[Type[Exception]], exc_value: Optional[Exception], @@ -276,9 +283,9 @@ class ntfyerror: ) if os.environ.get('DEBUG_NTFY', '1') == '0': return True - if ss := self.page.screenshot(): - save_screenshot(ss) - ntfy( + if ss := await self.page.screenshot(): + await asyncio.to_thread(save_screenshot, ss) + await ntfy( message=str(exc_value), title=f'xactfetch failed for {self.bank}', tags='warning', @@ -356,99 +363,109 @@ class CommerceBank: self.secrets = secrets self._logged_in = False - def __enter__(self) -> 'CommerceBank': + async def __aenter__(self) -> 'CommerceBank': return self - def __exit__( + async def __aexit__( self, exc_type: Optional[Type[Exception]], exc_value: Optional[Exception], tb: Optional[TracebackType], ) -> None: - self.logout() + await self.logout() - def login(self) -> None: + async def login(self) -> None: if self._logged_in: return log.debug('Navigating to %s', self.URL) - self.page.goto(self.URL) - username = self.secrets.get_secret('bank.commerce.username').decode() - password = self.secrets.get_secret('bank.commerce.password').decode() + await self.page.goto(self.URL) + username = await self.get_secret('bank.commerce.username') + password = await self.get_secret('bank.commerce.password') log.debug('Filling username/password login form') - self.page.get_by_role('textbox', name='Customer ID').fill(username) - self.page.get_by_role('textbox', name='Password').fill(password) - self.page.get_by_role('button', name='Log In').click() + await self.page.get_by_role('textbox', name='Customer ID').fill( + username + ) + await self.page.get_by_role('textbox', name='Password').fill(password) + await self.page.get_by_role('button', name='Log In').click() log.debug('Waiting for OTP 2FA form') otp_input = self.page.locator('id=securityCodeInput') - otp_input.wait_for() - self.page.wait_for_timeout(random.randint(1000, 3000)) + await otp_input.wait_for() + await self.page.wait_for_timeout(random.randint(1000, 3000)) log.debug('Filling OTP 2FA form') - otp = self.secrets.get_secret('bank.commerce.otp').decode() - otp_input.fill(otp) - with self.page.expect_event('load'): - self.page.get_by_role('button', name='Continue').click() + otp = await self.get_secret('bank.commerce.otp') + await otp_input.fill(otp) + async with self.page.expect_event('load'): + await self.page.get_by_role('button', name='Continue').click() log.debug('Waiting for page load') - self.page.wait_for_load_state() + await self.page.wait_for_load_state() cur_url = urllib.parse.urlparse(self.page.url) if cur_url.path != '/CBI/Accounts/Summary': new_url = cur_url._replace(path='/CBI/Accounts/Summary', query='') - self.page.goto(urllib.parse.urlunparse(new_url)) + await self.page.goto(urllib.parse.urlunparse(new_url)) log.info('Successfully logged in to Commerce Bank') self._logged_in = True - def logout(self) -> None: + async def logout(self) -> None: if not self._logged_in: return log.debug('Logging out of Commerce Bank') - with self.page.expect_event('load'): - self.page.get_by_test_id('navWrap').get_by_text('Logout').click() + async with self.page.expect_event('load'): + await self.page.get_by_test_id('navWrap').get_by_text( + 'Logout' + ).click() log.info('Logged out of Commerce Bank') - def open_account(self, account: str) -> None: + async def open_account(self, account: str) -> None: log.debug('Navigating to activity page for account %s', account) if '/Activity/' in self.page.url: - self.page.get_by_role('button', name='My Accounts').click() - with self.page.expect_event('load'): - self.page.get_by_role('link', name=account).click() - self.page.wait_for_load_state() - self.page.wait_for_timeout(random.randint(1000, 3000)) + await self.page.get_by_role('button', name='My Accounts').click() + async with self.page.expect_event('load'): + await self.page.get_by_role('link', name=account).click() + await self.page.wait_for_load_state() + await self.page.wait_for_timeout(random.randint(1000, 3000)) log.info('Loaded activity page for account %s', account) - def download_transactions( + async def download_transactions( self, from_date: datetime.date, to_date: datetime.date ) -> Path: log.info('Downloading transactions from %s to %s', from_date, to_date) datefmt = '%m/%d/%Y' - self.page.get_by_role('link', name='Download Transactions').click() - self.page.wait_for_timeout(random.randint(750, 1250)) + await self.page.get_by_role( + 'link', name='Download Transactions' + ).click() + await self.page.wait_for_timeout(random.randint(750, 1250)) modal = self.page.locator('#download-transactions') input_from = modal.locator('input[data-qaid=fromDate]') - input_from.click() - self.page.keyboard.press('Control+A') - self.page.keyboard.press('Delete') - self.page.keyboard.type(from_date.strftime(datefmt)) + await input_from.click() + await self.page.keyboard.press('Control+A') + await self.page.keyboard.press('Delete') + await self.page.keyboard.type(from_date.strftime(datefmt)) input_to = modal.locator('input[data-qaid=toDate]') - input_to.click() - self.page.keyboard.press('Control+A') - self.page.keyboard.press('Delete') - self.page.keyboard.type(to_date.strftime(datefmt)) - modal.get_by_role('button', name='Select Type').click() - self.page.get_by_text('Comma Separated').click() - with self.page.expect_download() as di: - self.page.get_by_role('button', name='Download').click() + await input_to.click() + await self.page.keyboard.press('Control+A') + await self.page.keyboard.press('Delete') + await self.page.keyboard.type(to_date.strftime(datefmt)) + await modal.get_by_role('button', name='Select Type').click() + await self.page.get_by_text('Comma Separated').click() + async with self.page.expect_download() as di: + await self.page.get_by_role('button', name='Download').click() log.debug('Waiting for download to complete') - path = di.value.path() + path = await (await di.value).path() assert path log.info('Downloaded transactions to %s', path) - modal.get_by_label('Close').click() + await modal.get_by_label('Close').click() return path - def firefly_import( + async def firefly_import( self, csv: Path, account: int, importer: FireflyImporter ) -> None: config = copy.deepcopy(self.IMPORT_CONFIG) config['default_account'] = account - importer.import_csv(csv, config) + await importer.import_csv(csv, config) + + async def get_secret(self, key: str) -> str: + secret = await self.secrets.get_secret(key) + return secret.decode() class Chase: @@ -513,26 +530,27 @@ class Chase: self.saved_cookies = Path('cookies.json') self._logged_in = False - def __enter__(self) -> 'Chase': - self.load_cookies() + async def __aenter__(self) -> 'Chase': + await self.load_cookies() return self - def __exit__( + async def __aexit__( self, exc_type: Optional[Type[Exception]], exc_value: Optional[Exception], tb: Optional[TracebackType], ) -> None: try: - self.logout() + await self.logout() finally: - self.save_cookies() + await self.save_cookies() - def load_cookies(self) -> None: + async def load_cookies(self) -> None: log.debug('Loading saved cookies from %s', self.saved_cookies) try: with self.saved_cookies.open(encoding='utf-8') as f: - self.page.context.add_cookies(json.load(f)) + cookies = await asyncio.to_thread(json.load, f) + await self.page.context.add_cookies(cookies) except: log.warning( 'Could not load saved cookies, ' @@ -541,94 +559,101 @@ class Chase: else: log.info('Successfully loaded saved cookies') - def save_cookies(self) -> None: + async def save_cookies(self) -> None: log.debug('Saving cookies from %s', self.saved_cookies) try: with self.saved_cookies.open('w', encoding='utf-8') as f: - f.write(json.dumps(self.page.context.cookies())) + cookies = await self.page.context.cookies() + f.write(await asyncio.to_thread(json.dumps, cookies)) except Exception as e: log.error('Failed to save cookies: %s', e) else: log.info('Successfully saved cookies to %s', self.saved_cookies) - def login(self) -> None: + async def login(self) -> None: if self._logged_in: return log.debug('Navigating to %s', self.URL) - self.page.goto(self.URL) - self.page.wait_for_load_state() - self.page.wait_for_timeout(random.randint(2000, 4000)) - username = self.secrets.get_secret('bank.chase.username').decode() - password = self.secrets.get_secret('bank.chase.password').decode() + await self.page.goto(self.URL) + await self.page.wait_for_load_state() + await self.page.wait_for_timeout(random.randint(2000, 4000)) + username = await self.get_secret('bank.chase.username') + password = await self.get_secret('bank.chase.password') log.debug('Filling username/password login form') logonbox = self.page.frame_locator('#logonbox') - logonbox.get_by_label('Username').fill(username) - logonbox.get_by_label('Password').fill(password) - self.page.wait_for_timeout(random.randint(500, 750)) - logonbox.get_by_role('button', name='Sign in').click() + await logonbox.get_by_label('Username').fill(username) + await logonbox.get_by_label('Password').fill(password) + await self.page.wait_for_timeout(random.randint(500, 750)) + await logonbox.get_by_role('button', name='Sign in').click() log.debug('Waiting for page load') - self.page.wait_for_load_state() - self.page.get_by_role('button', name='Pay Card').wait_for( + await self.page.wait_for_load_state() + await self.page.get_by_role('button', name='Pay Card').wait_for( timeout=120000 ) log.info('Successfully logged in to Chase') self._logged_in = True - def download_transactions( + async def download_transactions( self, from_date: datetime.date, to_date: datetime.date ) -> Path: log.info('Downloading transactions from %s to %s', from_date, to_date) fmt = '%m/%d/%Y' - self.page.locator('#CARD_ACCOUNTS').get_by_role( + await self.page.locator('#CARD_ACCOUNTS').get_by_role( 'button', name='(...2467)' ).first.click() fl = self.page.locator('#flyout') - fl.wait_for() - fl.get_by_role('button', name='Pay card', exact=True).wait_for() - fl.get_by_role( + await fl.wait_for() + await fl.get_by_role('button', name='Pay card', exact=True).wait_for() + await fl.get_by_role( 'button', name='Account activity', exact=True ).wait_for() - fl.get_by_role('link', name='Show details').wait_for() - fl.get_by_role('link', name='Activity since last statement').click() - fl.get_by_role('link', name='All transactions').click() - fl.get_by_text('See more activity').wait_for() - fl.get_by_role('button', name='Download Account Activity').click() + await fl.get_by_role('link', name='Show details').wait_for() + await fl.get_by_role( + 'link', name='Activity since last statement' + ).click() + await fl.get_by_role('link', name='All transactions').click() + await fl.get_by_text('See more activity').wait_for() + await fl.get_by_role( + 'button', name='Download Account Activity' + ).click() log.debug('Filling account activity download form') - self.page.locator('#select-downloadActivityOptionId-label').click() - self.page.get_by_text('Choose a date range').nth(1).locator( + await self.page.locator( + '#select-downloadActivityOptionId-label' + ).click() + await self.page.get_by_text('Choose a date range').nth(1).locator( '../..' ).click() - self.page.wait_for_timeout(random.randint(500, 1500)) - self.page.locator('#accountActivityFromDate-input-input').fill( + await self.page.wait_for_timeout(random.randint(500, 1500)) + await self.page.locator('#accountActivityFromDate-input-input').fill( from_date.strftime(fmt) ) - self.page.locator('#accountActivityFromDate-input-input').blur() - self.page.wait_for_timeout(random.randint(500, 1500)) - self.page.locator('#accountActivityToDate-input-input').fill( + await self.page.locator('#accountActivityFromDate-input-input').blur() + await self.page.wait_for_timeout(random.randint(500, 1500)) + await self.page.locator('#accountActivityToDate-input-input').fill( to_date.strftime(fmt) ) - self.page.locator('#accountActivityToDate-input-input').blur() - self.page.wait_for_timeout(random.randint(500, 1500)) - with self.page.expect_download(timeout=5000) as di: - self.page.get_by_role( + await self.page.locator('#accountActivityToDate-input-input').blur() + await self.page.wait_for_timeout(random.randint(500, 1500)) + async with self.page.expect_download(timeout=5000) as di: + await self.page.get_by_role( 'button', name='Download', exact=True ).click() log.debug('Waiting for download to complete') - self.page.wait_for_timeout(random.randint(1000, 2500)) - path = di.value.path() + await self.page.wait_for_timeout(random.randint(1000, 2500)) + path = await (await di.value).path() assert path log.info('Downloaded transactions to %s', path) return path - def logout(self) -> None: + async def logout(self) -> None: if not self._logged_in: return log.debug('Logging out of Chase') - with self.page.expect_event('load'): - self.page.get_by_role('button', name='Sign out').click() + async with self.page.expect_event('load'): + await self.page.get_by_role('button', name='Sign out').click() log.info('Logged out of Chase') - def firefly_import( + async def firefly_import( self, csv: Path, account: int, importer: FireflyImporter ) -> None: config = copy.deepcopy(self.IMPORT_CONFIG) @@ -643,38 +668,52 @@ class Chase: config['do_mapping'].pop(0) else: raise ValueError(f'Unexpected CSV schema: {headers}') - importer.import_csv(csv, config) + await importer.import_csv(csv, config) + + async def get_secret(self, key: str) -> str: + secret = await self.secrets.get_secret(key) + return secret.decode() -def main() -> None: +async def amain() -> None: logging.basicConfig(level=logging.DEBUG) secrets = SecretsClient() - secrets.connect() + await secrets.connect() log.debug('Getting Firefly III access token') - token = secrets.get_secret('firefly.token').decode() - import_secret = secrets.get_secret('firefly.import.secret').decode() + token = (await secrets.get_secret('firefly.token')).decode() + import_secret = ( + await secrets.get_secret('firefly.import.secret') + ).decode() import_auth = ( - secrets.get_secret('firefly.import.username').decode(), - secrets.get_secret('firefly.import.password').decode(), + (await secrets.get_secret('firefly.import.username')).decode(), + (await secrets.get_secret('firefly.import.password')).decode(), ) importer = FireflyImporter( FIREFLY_III_IMPORTER_URL, import_secret, import_auth ) end_date = datetime.date.today() - datetime.timedelta(days=1) - with sync_playwright() as pw, secrets: + failed = False + async with async_playwright() as pw, secrets: headless = os.environ.get('DEBUG_HEADLESS_BROWSER', '1') == '1' - browser = pw.firefox.launch(headless=headless) - page = browser.new_page() - failed = False + browser = await pw.firefox.launch(headless=headless) + page = await browser.new_page() banks = sys.argv[1:] or list(ACCOUNTS.keys()) if 'commerce' in banks: - if not download_commerce(page, secrets, end_date, token, importer): + if not await download_commerce( + page, secrets, end_date, token, importer + ): failed = True if 'chase' in banks: - if not download_chase(page, secrets, end_date, token, importer): + if not await download_chase( + page, secrets, end_date, token, importer + ): failed = True raise SystemExit(1 if failed else 0) +def main(): + asyncio.run(amain()) + + if __name__ == '__main__': main()