134 lines
3.6 KiB
Python
134 lines
3.6 KiB
Python
import contextlib
|
|
import datetime
|
|
import importlib.metadata
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Annotated, BinaryIO, Optional, Self, Type
|
|
from types import TracebackType
|
|
|
|
import fastapi
|
|
import httpx
|
|
|
|
|
|
__dist__ = importlib.metadata.metadata(__name__)
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
PAPERLESS_TOKEN: str
|
|
PAPERLESS_URL = os.environ['PAPERLESS_URL'].rstrip('/')
|
|
|
|
|
|
router = fastapi.APIRouter()
|
|
|
|
|
|
class Paperless:
|
|
def __init__(self) -> None:
|
|
self.client: Optional[httpx.AsyncClient] = None
|
|
|
|
async def __aenter__(self) -> Self:
|
|
self.client = httpx.AsyncClient()
|
|
return self
|
|
|
|
async def __aexit__(
|
|
self,
|
|
exc_type: Optional[Type[Exception]],
|
|
exc_value: Optional[Exception],
|
|
tb: Optional[TracebackType],
|
|
) -> None:
|
|
if self.client:
|
|
await self.client.aclose()
|
|
self.client = None
|
|
|
|
async def upload(
|
|
self, filename: str, image: BinaryIO, date: datetime.date
|
|
) -> str:
|
|
assert self.client
|
|
log.debug('Sending %s to paperless', filename)
|
|
r = await self.client.post(
|
|
f'{PAPERLESS_URL}/api/documents/post_document/',
|
|
headers={
|
|
'Authorization': f'Token {PAPERLESS_TOKEN}',
|
|
},
|
|
files={
|
|
'document': (filename, image),
|
|
},
|
|
data={
|
|
'created': date.strftime('%Y-%m-%d'),
|
|
},
|
|
)
|
|
r.raise_for_status()
|
|
task_id = r.text.strip()
|
|
log.info(
|
|
'Successfully uploaded %s to paperless; started consume task %s',
|
|
filename,
|
|
task_id,
|
|
)
|
|
return task_id
|
|
|
|
|
|
@router.get('/', response_class=fastapi.responses.HTMLResponse)
|
|
def get_form():
|
|
path = Path(__file__).with_name('index.html')
|
|
try:
|
|
f = path.open('r', encoding='utf-8')
|
|
except FileNotFoundError:
|
|
raise fastapi.HTTPException(
|
|
status_code=fastapi.status.HTTP_404_NOT_FOUND,
|
|
)
|
|
with path.open('r', encoding='utf-8') as f:
|
|
return f.read()
|
|
|
|
|
|
@router.post(
|
|
'/',
|
|
response_class=fastapi.responses.PlainTextResponse,
|
|
status_code=fastapi.status.HTTP_204_NO_CONTENT,
|
|
)
|
|
async def upload_receipts(
|
|
images: Annotated[list[fastapi.UploadFile], fastapi.File(alias='image[]')],
|
|
dates: Annotated[list[datetime.date], fastapi.Form(alias='date[]')],
|
|
# notes: Annotated[list[str], fastapi.Form(alias='notes[]')],
|
|
):
|
|
if len(dates) != len(images):
|
|
msg = (
|
|
f'Number of uploaded images ({len(images)})'
|
|
f' does not match number of date fields ({len(dates)})'
|
|
)
|
|
log.warning('%s', msg)
|
|
raise fastapi.HTTPException(
|
|
status_code=fastapi.status.HTTP_400_BAD_REQUEST,
|
|
detail=msg,
|
|
)
|
|
failed = False
|
|
async with Paperless() as paperless:
|
|
for idx, image in enumerate(images):
|
|
date = dates[idx]
|
|
filename = image.filename or f'image{idx}'
|
|
try:
|
|
await paperless.upload(filename, image.file, date)
|
|
except Exception as e:
|
|
log.error('Failed to send %s to Paperless: %s', filename, e)
|
|
failed = True
|
|
if failed:
|
|
raise fastapi.HTTPException(
|
|
status_code=fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@contextlib.asynccontextmanager
|
|
async def lifespan(app: fastapi.FastAPI):
|
|
global PAPERLESS_TOKEN
|
|
PAPERLESS_TOKEN = (
|
|
Path(os.environ['PAPERLESS_TOKEN_FILE']).read_text().strip()
|
|
)
|
|
yield
|
|
|
|
|
|
app = fastapi.FastAPI(
|
|
version=__dist__['version'],
|
|
lifespan=lifespan,
|
|
)
|
|
app.include_router(router)
|