From 43aba0c848cd20e9465a19720a77d8cd6de60e6b Mon Sep 17 00:00:00 2001 From: "Dustin C. Hatch" Date: Mon, 8 Jul 2024 18:12:22 -0500 Subject: [PATCH] Switch to async API Using the Playwrigt async API is the only way to wait for one of multiple conditions. We will need this capability in order to detect certain abnormal conditions, such as spurious 2FA auth or interstitial ads. --- pyproject.toml | 5 + xactfetch.py | 389 +++++++++++++++++++++++++++---------------------- 2 files changed, 219 insertions(+), 175 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0edcdd8..87d020f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ classifiers = [ "Programming Language :: Python :: 3", ] dependencies = [ + "httpx~=0.27.0", "playwright~=1.32", "requests~=2.29.0", ] @@ -28,3 +29,7 @@ build-backend = "setuptools.build_meta" [tool.pyright] venvPath = '.' venv = '.venv' + +[tool.black] +line-length = 79 +skip-string-normalization = true diff --git a/xactfetch.py b/xactfetch.py index 203c91f..d3a0a04 100644 --- a/xactfetch.py +++ b/xactfetch.py @@ -1,3 +1,4 @@ +import asyncio import base64 import copy import datetime @@ -5,16 +6,15 @@ import json import logging import os import random -import socket import sys import urllib.parse from pathlib import Path from types import TracebackType from typing import Any, Optional, Type -import requests -from playwright.sync_api import Page -from playwright.sync_api import sync_playwright +import httpx +from playwright.async_api import Page +from playwright.async_api import async_playwright log = logging.getLogger('xactfetch') @@ -46,8 +46,9 @@ class FireflyImporter: self.url = url self.secret = secret self.auth = auth + self.client = httpx.AsyncClient() - def import_csv( + async def import_csv( self, csv: Path, config: dict[str, Any], @@ -55,58 +56,58 @@ class FireflyImporter: log.debug('Importing transactions from %s to Firefly III', csv) url = f'{self.url.rstrip("/")}/autoupload' with csv.open('rb') as f: - r = requests.post( - url, - auth=self.auth, - headers={ - 'Accept': 'application/json', - }, - params={ - 'secret': self.secret, - }, - files={ - 'importable': ('import.csv', f), - 'json': ('import.json', json.dumps(config)), - }, - ) - r.raise_for_status() + async with httpx.AsyncClient(auth=self.auth) as client: + r = await client.post( + url, + params={ + 'secret': self.secret, + }, + headers={ + 'Accept': 'application/json', + }, + files={ + 'importable': ('import.csv', f), + 'json': ('import.json', json.dumps(config)), + }, + timeout=300, + ) + r.raise_for_status() class SecretsClient: def __init__(self) -> None: - self.sock: socket.socket + self.sock: tuple[asyncio.StreamReader, asyncio.StreamWriter] - def __enter__(self) -> 'SecretsClient': + async def __aenter__(self) -> 'SecretsClient': if not hasattr(self, 'sock'): - self.connect() + await self.connect() return self - def __exit__( + async def __aexit__( self, exc_type: Optional[Type[Exception]], exc_value: Optional[Exception], tb: Optional[TracebackType], ) -> bool: - self.sock.close() + self.sock[1].close() return False - def connect(self) -> None: + async def connect(self) -> None: if SECRET_SOCKET_PATH: path = Path(SECRET_SOCKET_PATH) elif XDG_RUNTIME_DIR: path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss' else: path = '.secretsocket' - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.sock.connect(str(path)) + self.sock = await asyncio.open_unix_connection(str(path)) - def get_secret(self, key: str) -> bytes: - self.sock.send(f'{key}\n'.encode('utf-8')) - buf = self.sock.recv(64 * 2**10) + async def get_secret(self, key: str) -> bytes: + self.sock[1].write(f'{key}\n'.encode('utf-8')) + buf = await self.sock[0].read(64 * 2**10) return buf.rstrip(b'\n') -def ntfy( +async def ntfy( message: Optional[str] = None, topic: str = NTFY_TOPIC, title: Optional[str] = None, @@ -121,6 +122,7 @@ def ntfy( if tags: headers['Tags'] = tags url = f'{NTFY_URL}/{topic}' + client = httpx.AsyncClient() if attach: if filename: headers['Filename'] = filename @@ -132,17 +134,19 @@ def ntfy( else: message = message.replace('\n', '\\n') headers['Message'] = message - r = requests.put( - url, - headers=headers, - data=attach, - ) + async with client: + r = await client.put( + url, + headers=headers, + content=attach, + ) else: - r = requests.post( - url, - headers=headers, - data=message, - ) + async with client: + r = await client.post( + url, + headers=headers, + content=message, + ) r.raise_for_status() @@ -153,15 +157,16 @@ def rfc2047_base64encode( return f"=?UTF-8?B?{encoded}?=" -def get_last_transaction_date(key: int, token: str) -> datetime.date: +async def get_last_transaction_date(key: int, token: str) -> datetime.date: url = f'{FIREFLY_III_URL}/api/v1/accounts/{key}/transactions' - r = requests.get( - url, - headers={ - 'Authorization': f'Bearer {token}', - 'Accept': 'application/vnd.api+json', - }, - ) + async with httpx.AsyncClient() as client: + r = await client.get( + url, + headers={ + 'Authorization': f'Bearer {token}', + 'Accept': 'application/vnd.api+json', + }, + ) r.raise_for_status() last_date = datetime.datetime.min for xact in r.json()['data']: @@ -181,17 +186,17 @@ def get_last_transaction_date(key: int, token: str) -> datetime.date: return last_date.date() -def download_chase( +async def download_chase( page: Page, secrets: SecretsClient, end_date: datetime.date, token: str, importer: FireflyImporter, ) -> bool: - with Chase(page, secrets) as c, ntfyerror('Chase', page) as r: + async with Chase(page, secrets) as c, ntfyerror('Chase', page) as r: key = ACCOUNTS['chase'] try: - start_date = get_last_transaction_date(key, token) + start_date = await get_last_transaction_date(key, token) except (OSError, ValueError) as e: log.error( 'Skipping Chase account: could not get last transaction: %s', @@ -204,14 +209,14 @@ def download_chase( start_date, ) return True - c.login() - csv = c.download_transactions(start_date, end_date) + await c.login() + csv = await c.download_transactions(start_date, end_date) log.info('Importing transactions from Chase into Firefly III') - c.firefly_import(csv, key, importer) + await c.firefly_import(csv, key, importer) return r.success -def download_commerce( +async def download_commerce( page: Page, secrets: SecretsClient, end_date: datetime.date, @@ -220,12 +225,12 @@ def download_commerce( ) -> bool: log.info('Downloading transaction lists from Commerce Bank') csvs = [] - with CommerceBank(page, secrets) as c, ntfyerror( + async with CommerceBank(page, secrets) as c, ntfyerror( 'Commerce Bank', page ) as r: for name, key in ACCOUNTS['commerce'].items(): try: - start_date = get_last_transaction_date(key, token) + start_date = await get_last_transaction_date(key, token) except (OSError, ValueError) as e: log.error( 'Skipping account %s: could not get last transaction: %s', @@ -245,12 +250,14 @@ def download_commerce( start_date, name, ) - c.login() - c.open_account(name) - csvs.append((key, c.download_transactions(start_date, end_date))) + await c.login() + await c.open_account(name) + csvs.append( + (key, await c.download_transactions(start_date, end_date)) + ) log.info('Importing transactions from Commerce Bank into Firefly III') for key, csv in csvs: - c.firefly_import(csv, key, importer) + await c.firefly_import(csv, key, importer) return r.success @@ -260,10 +267,10 @@ class ntfyerror: self.page = page self.success = True - def __enter__(self) -> 'ntfyerror': + async def __aenter__(self) -> 'ntfyerror': return self - def __exit__( + async def __aexit__( self, exc_type: Optional[Type[Exception]], exc_value: Optional[Exception], @@ -276,9 +283,9 @@ class ntfyerror: ) if os.environ.get('DEBUG_NTFY', '1') == '0': return True - if ss := self.page.screenshot(): - save_screenshot(ss) - ntfy( + if ss := await self.page.screenshot(): + await asyncio.to_thread(save_screenshot, ss) + await ntfy( message=str(exc_value), title=f'xactfetch failed for {self.bank}', tags='warning', @@ -356,99 +363,109 @@ class CommerceBank: self.secrets = secrets self._logged_in = False - def __enter__(self) -> 'CommerceBank': + async def __aenter__(self) -> 'CommerceBank': return self - def __exit__( + async def __aexit__( self, exc_type: Optional[Type[Exception]], exc_value: Optional[Exception], tb: Optional[TracebackType], ) -> None: - self.logout() + await self.logout() - def login(self) -> None: + async def login(self) -> None: if self._logged_in: return log.debug('Navigating to %s', self.URL) - self.page.goto(self.URL) - username = self.secrets.get_secret('bank.commerce.username').decode() - password = self.secrets.get_secret('bank.commerce.password').decode() + await self.page.goto(self.URL) + username = await self.get_secret('bank.commerce.username') + password = await self.get_secret('bank.commerce.password') log.debug('Filling username/password login form') - self.page.get_by_role('textbox', name='Customer ID').fill(username) - self.page.get_by_role('textbox', name='Password').fill(password) - self.page.get_by_role('button', name='Log In').click() + await self.page.get_by_role('textbox', name='Customer ID').fill( + username + ) + await self.page.get_by_role('textbox', name='Password').fill(password) + await self.page.get_by_role('button', name='Log In').click() log.debug('Waiting for OTP 2FA form') otp_input = self.page.locator('id=securityCodeInput') - otp_input.wait_for() - self.page.wait_for_timeout(random.randint(1000, 3000)) + await otp_input.wait_for() + await self.page.wait_for_timeout(random.randint(1000, 3000)) log.debug('Filling OTP 2FA form') - otp = self.secrets.get_secret('bank.commerce.otp').decode() - otp_input.fill(otp) - with self.page.expect_event('load'): - self.page.get_by_role('button', name='Continue').click() + otp = await self.get_secret('bank.commerce.otp') + await otp_input.fill(otp) + async with self.page.expect_event('load'): + await self.page.get_by_role('button', name='Continue').click() log.debug('Waiting for page load') - self.page.wait_for_load_state() + await self.page.wait_for_load_state() cur_url = urllib.parse.urlparse(self.page.url) if cur_url.path != '/CBI/Accounts/Summary': new_url = cur_url._replace(path='/CBI/Accounts/Summary', query='') - self.page.goto(urllib.parse.urlunparse(new_url)) + await self.page.goto(urllib.parse.urlunparse(new_url)) log.info('Successfully logged in to Commerce Bank') self._logged_in = True - def logout(self) -> None: + async def logout(self) -> None: if not self._logged_in: return log.debug('Logging out of Commerce Bank') - with self.page.expect_event('load'): - self.page.get_by_test_id('navWrap').get_by_text('Logout').click() + async with self.page.expect_event('load'): + await self.page.get_by_test_id('navWrap').get_by_text( + 'Logout' + ).click() log.info('Logged out of Commerce Bank') - def open_account(self, account: str) -> None: + async def open_account(self, account: str) -> None: log.debug('Navigating to activity page for account %s', account) if '/Activity/' in self.page.url: - self.page.get_by_role('button', name='My Accounts').click() - with self.page.expect_event('load'): - self.page.get_by_role('link', name=account).click() - self.page.wait_for_load_state() - self.page.wait_for_timeout(random.randint(1000, 3000)) + await self.page.get_by_role('button', name='My Accounts').click() + async with self.page.expect_event('load'): + await self.page.get_by_role('link', name=account).click() + await self.page.wait_for_load_state() + await self.page.wait_for_timeout(random.randint(1000, 3000)) log.info('Loaded activity page for account %s', account) - def download_transactions( + async def download_transactions( self, from_date: datetime.date, to_date: datetime.date ) -> Path: log.info('Downloading transactions from %s to %s', from_date, to_date) datefmt = '%m/%d/%Y' - self.page.get_by_role('link', name='Download Transactions').click() - self.page.wait_for_timeout(random.randint(750, 1250)) + await self.page.get_by_role( + 'link', name='Download Transactions' + ).click() + await self.page.wait_for_timeout(random.randint(750, 1250)) modal = self.page.locator('#download-transactions') input_from = modal.locator('input[data-qaid=fromDate]') - input_from.click() - self.page.keyboard.press('Control+A') - self.page.keyboard.press('Delete') - self.page.keyboard.type(from_date.strftime(datefmt)) + await input_from.click() + await self.page.keyboard.press('Control+A') + await self.page.keyboard.press('Delete') + await self.page.keyboard.type(from_date.strftime(datefmt)) input_to = modal.locator('input[data-qaid=toDate]') - input_to.click() - self.page.keyboard.press('Control+A') - self.page.keyboard.press('Delete') - self.page.keyboard.type(to_date.strftime(datefmt)) - modal.get_by_role('button', name='Select Type').click() - self.page.get_by_text('Comma Separated').click() - with self.page.expect_download() as di: - self.page.get_by_role('button', name='Download').click() + await input_to.click() + await self.page.keyboard.press('Control+A') + await self.page.keyboard.press('Delete') + await self.page.keyboard.type(to_date.strftime(datefmt)) + await modal.get_by_role('button', name='Select Type').click() + await self.page.get_by_text('Comma Separated').click() + async with self.page.expect_download() as di: + await self.page.get_by_role('button', name='Download').click() log.debug('Waiting for download to complete') - path = di.value.path() + path = await (await di.value).path() assert path log.info('Downloaded transactions to %s', path) - modal.get_by_label('Close').click() + await modal.get_by_label('Close').click() return path - def firefly_import( + async def firefly_import( self, csv: Path, account: int, importer: FireflyImporter ) -> None: config = copy.deepcopy(self.IMPORT_CONFIG) config['default_account'] = account - importer.import_csv(csv, config) + await importer.import_csv(csv, config) + + async def get_secret(self, key: str) -> str: + secret = await self.secrets.get_secret(key) + return secret.decode() class Chase: @@ -513,26 +530,27 @@ class Chase: self.saved_cookies = Path('cookies.json') self._logged_in = False - def __enter__(self) -> 'Chase': - self.load_cookies() + async def __aenter__(self) -> 'Chase': + await self.load_cookies() return self - def __exit__( + async def __aexit__( self, exc_type: Optional[Type[Exception]], exc_value: Optional[Exception], tb: Optional[TracebackType], ) -> None: try: - self.logout() + await self.logout() finally: - self.save_cookies() + await self.save_cookies() - def load_cookies(self) -> None: + async def load_cookies(self) -> None: log.debug('Loading saved cookies from %s', self.saved_cookies) try: with self.saved_cookies.open(encoding='utf-8') as f: - self.page.context.add_cookies(json.load(f)) + cookies = await asyncio.to_thread(json.load, f) + await self.page.context.add_cookies(cookies) except: log.warning( 'Could not load saved cookies, ' @@ -541,94 +559,101 @@ class Chase: else: log.info('Successfully loaded saved cookies') - def save_cookies(self) -> None: + async def save_cookies(self) -> None: log.debug('Saving cookies from %s', self.saved_cookies) try: with self.saved_cookies.open('w', encoding='utf-8') as f: - f.write(json.dumps(self.page.context.cookies())) + cookies = await self.page.context.cookies() + f.write(await asyncio.to_thread(json.dumps, cookies)) except Exception as e: log.error('Failed to save cookies: %s', e) else: log.info('Successfully saved cookies to %s', self.saved_cookies) - def login(self) -> None: + async def login(self) -> None: if self._logged_in: return log.debug('Navigating to %s', self.URL) - self.page.goto(self.URL) - self.page.wait_for_load_state() - self.page.wait_for_timeout(random.randint(2000, 4000)) - username = self.secrets.get_secret('bank.chase.username').decode() - password = self.secrets.get_secret('bank.chase.password').decode() + await self.page.goto(self.URL) + await self.page.wait_for_load_state() + await self.page.wait_for_timeout(random.randint(2000, 4000)) + username = await self.get_secret('bank.chase.username') + password = await self.get_secret('bank.chase.password') log.debug('Filling username/password login form') logonbox = self.page.frame_locator('#logonbox') - logonbox.get_by_label('Username').fill(username) - logonbox.get_by_label('Password').fill(password) - self.page.wait_for_timeout(random.randint(500, 750)) - logonbox.get_by_role('button', name='Sign in').click() + await logonbox.get_by_label('Username').fill(username) + await logonbox.get_by_label('Password').fill(password) + await self.page.wait_for_timeout(random.randint(500, 750)) + await logonbox.get_by_role('button', name='Sign in').click() log.debug('Waiting for page load') - self.page.wait_for_load_state() - self.page.get_by_role('button', name='Pay Card').wait_for( + await self.page.wait_for_load_state() + await self.page.get_by_role('button', name='Pay Card').wait_for( timeout=120000 ) log.info('Successfully logged in to Chase') self._logged_in = True - def download_transactions( + async def download_transactions( self, from_date: datetime.date, to_date: datetime.date ) -> Path: log.info('Downloading transactions from %s to %s', from_date, to_date) fmt = '%m/%d/%Y' - self.page.locator('#CARD_ACCOUNTS').get_by_role( + await self.page.locator('#CARD_ACCOUNTS').get_by_role( 'button', name='(...2467)' ).first.click() fl = self.page.locator('#flyout') - fl.wait_for() - fl.get_by_role('button', name='Pay card', exact=True).wait_for() - fl.get_by_role( + await fl.wait_for() + await fl.get_by_role('button', name='Pay card', exact=True).wait_for() + await fl.get_by_role( 'button', name='Account activity', exact=True ).wait_for() - fl.get_by_role('link', name='Show details').wait_for() - fl.get_by_role('link', name='Activity since last statement').click() - fl.get_by_role('link', name='All transactions').click() - fl.get_by_text('See more activity').wait_for() - fl.get_by_role('button', name='Download Account Activity').click() + await fl.get_by_role('link', name='Show details').wait_for() + await fl.get_by_role( + 'link', name='Activity since last statement' + ).click() + await fl.get_by_role('link', name='All transactions').click() + await fl.get_by_text('See more activity').wait_for() + await fl.get_by_role( + 'button', name='Download Account Activity' + ).click() log.debug('Filling account activity download form') - self.page.locator('#select-downloadActivityOptionId-label').click() - self.page.get_by_text('Choose a date range').nth(1).locator( + await self.page.locator( + '#select-downloadActivityOptionId-label' + ).click() + await self.page.get_by_text('Choose a date range').nth(1).locator( '../..' ).click() - self.page.wait_for_timeout(random.randint(500, 1500)) - self.page.locator('#accountActivityFromDate-input-input').fill( + await self.page.wait_for_timeout(random.randint(500, 1500)) + await self.page.locator('#accountActivityFromDate-input-input').fill( from_date.strftime(fmt) ) - self.page.locator('#accountActivityFromDate-input-input').blur() - self.page.wait_for_timeout(random.randint(500, 1500)) - self.page.locator('#accountActivityToDate-input-input').fill( + await self.page.locator('#accountActivityFromDate-input-input').blur() + await self.page.wait_for_timeout(random.randint(500, 1500)) + await self.page.locator('#accountActivityToDate-input-input').fill( to_date.strftime(fmt) ) - self.page.locator('#accountActivityToDate-input-input').blur() - self.page.wait_for_timeout(random.randint(500, 1500)) - with self.page.expect_download(timeout=5000) as di: - self.page.get_by_role( + await self.page.locator('#accountActivityToDate-input-input').blur() + await self.page.wait_for_timeout(random.randint(500, 1500)) + async with self.page.expect_download(timeout=5000) as di: + await self.page.get_by_role( 'button', name='Download', exact=True ).click() log.debug('Waiting for download to complete') - self.page.wait_for_timeout(random.randint(1000, 2500)) - path = di.value.path() + await self.page.wait_for_timeout(random.randint(1000, 2500)) + path = await (await di.value).path() assert path log.info('Downloaded transactions to %s', path) return path - def logout(self) -> None: + async def logout(self) -> None: if not self._logged_in: return log.debug('Logging out of Chase') - with self.page.expect_event('load'): - self.page.get_by_role('button', name='Sign out').click() + async with self.page.expect_event('load'): + await self.page.get_by_role('button', name='Sign out').click() log.info('Logged out of Chase') - def firefly_import( + async def firefly_import( self, csv: Path, account: int, importer: FireflyImporter ) -> None: config = copy.deepcopy(self.IMPORT_CONFIG) @@ -643,38 +668,52 @@ class Chase: config['do_mapping'].pop(0) else: raise ValueError(f'Unexpected CSV schema: {headers}') - importer.import_csv(csv, config) + await importer.import_csv(csv, config) + + async def get_secret(self, key: str) -> str: + secret = await self.secrets.get_secret(key) + return secret.decode() -def main() -> None: +async def amain() -> None: logging.basicConfig(level=logging.DEBUG) secrets = SecretsClient() - secrets.connect() + await secrets.connect() log.debug('Getting Firefly III access token') - token = secrets.get_secret('firefly.token').decode() - import_secret = secrets.get_secret('firefly.import.secret').decode() + token = (await secrets.get_secret('firefly.token')).decode() + import_secret = ( + await secrets.get_secret('firefly.import.secret') + ).decode() import_auth = ( - secrets.get_secret('firefly.import.username').decode(), - secrets.get_secret('firefly.import.password').decode(), + (await secrets.get_secret('firefly.import.username')).decode(), + (await secrets.get_secret('firefly.import.password')).decode(), ) importer = FireflyImporter( FIREFLY_III_IMPORTER_URL, import_secret, import_auth ) end_date = datetime.date.today() - datetime.timedelta(days=1) - with sync_playwright() as pw, secrets: + failed = False + async with async_playwright() as pw, secrets: headless = os.environ.get('DEBUG_HEADLESS_BROWSER', '1') == '1' - browser = pw.firefox.launch(headless=headless) - page = browser.new_page() - failed = False + browser = await pw.firefox.launch(headless=headless) + page = await browser.new_page() banks = sys.argv[1:] or list(ACCOUNTS.keys()) if 'commerce' in banks: - if not download_commerce(page, secrets, end_date, token, importer): + if not await download_commerce( + page, secrets, end_date, token, importer + ): failed = True if 'chase' in banks: - if not download_chase(page, secrets, end_date, token, importer): + if not await download_chase( + page, secrets, end_date, token, importer + ): failed = True raise SystemExit(1 if failed else 0) +def main(): + asyncio.run(amain()) + + if __name__ == '__main__': main()