Initial commit

master
Dustin 2019-04-30 14:58:56 -05:00
commit 381f7ac453
4 changed files with 235 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
/.eggs/
/.venv/
*.egg-info/
__pycache__/
*.py[co]

6
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,6 @@
{
"python.pythonPath": ".venv/bin/python3",
"files.watcherExclude": {
"**/.venv/**": true
}
}

18
setup.py Normal file
View File

@ -0,0 +1,18 @@
from setuptools import find_packages, setup
setup(
name='pass2bw',
use_scm_version=True,
description='Import passwords from pass into Bitwarden using bw',
author='Dustin C. Hatch',
author_email='dustin@hatch.name',
license='Apache-2',
py_modules=['pass2bw'],
package_dir={'': 'src'},
setup_requires=['setuptools_scm'],
entry_points={
'console_scripts': [
'pass2bw=pass2bw:main',
],
},
)

206
src/pass2bw.py Normal file
View File

@ -0,0 +1,206 @@
from typing import (
Any,
Dict,
Iterator,
List,
Optional,
)
import base64
import json
import logging
import os
import subprocess
log = logging.getLogger('pass2bw')
class BitwardenError(Exception):
pass
class Vault:
def __init__(self) -> None:
self.folders: List[Dict[str, str]] = []
self.items: List[Dict[str, Any]] = []
def create_folder(self, name) -> Dict[str, str]:
log.info('Creating folder %s', name)
data = {
'name': name,
}
cmd = ['bw', 'create', 'folder']
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
res = p.communicate(
base64.b64encode(json.dumps(data).encode())
)[0]
if p.returncode == 0:
folder = json.loads(res)
self.folders.append(folder)
return folder
else:
raise BitwardenError(res.rstrip(b'\n').decode())
def create_item(self, data) -> Dict[str, Any]:
cmd = ['bw', 'create', 'item']
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
res = p.communicate(
base64.b64encode(json.dumps(data).encode())
)[0]
if p.returncode == 0:
item = json.loads(res)
self.items.append(item)
return item
else:
raise BitwardenError(res.rstrip(b'\n').decode())
def get_folders(self) -> None:
cmd = ['bw', 'list', 'folders']
p = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
)
data = p.communicate()[0]
if p.returncode == 0:
return json.loads(data)
else:
raise BitwardenError(data.rstrip(b'\n').decode())
def get_items(self) -> None:
cmd = ['bw', 'list', 'items']
p = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
)
data = p.communicate()[0]
if p.returncode == 0:
return json.loads(data)
else:
raise BitwardenError(data.rstrip(b'\n').decode())
def load(self) -> None:
self.folders = self.get_folders()
self.items = self.get_items()
def sync(self) -> None:
cmd = ['bw', 'sync']
p = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
)
res = p.communicate()[0]
if p.returncode != 0:
raise BitwardenError(res.rstrip(b'\n').decode())
class PassEntry:
def __init__(self, key: str) -> None:
self.key = key
self.name = key.rsplit(os.sep, 1)[-1]
self.password: str
self.username: Optional[str] = None
self.custom_fields: Dict[str, str] = {}
@classmethod
def from_key(cls, key: str) -> 'PassEntry':
self = cls(key)
log.debug('Loading %s', key)
cmd = ['pass', 'show', key]
p = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
)
try:
self.password = p.stdout.readline().rstrip(b'\n').decode()
for line in iter(p.stdout.readline, b''):
if b':' in line:
key, value = line.decode().split(':')
if key == 'login':
self.username = value
else:
self.custom_fields[key] = value
finally:
p.stdout.close()
p.wait()
return self
@classmethod
def list_all(cls) -> Iterator['PassEntry']:
topdir = os.path.expanduser('~/.password-store')
for basedir, dirnames, filenames in os.walk(topdir):
for dirname in dirnames:
if dirname.startswith('.'):
log.debug('Ignoring %s', dirname)
dirnames.remove(dirname)
for filename in filenames:
basename, ext = os.path.splitext(filename)
if ext != '.gpg':
log.debug('Skipping %s', filename)
continue
path = os.path.join(basedir, basename)
key = os.path.relpath(path, topdir)
yield cls.from_key(key)
def migrate(self, vault: Vault, folder: str = None) -> None:
if folder:
for f in vault.folders:
if f['name'] == folder:
folder_id = f['id']
break
else:
f = vault.create_folder(folder)
folder_id = f['id']
else:
folder_id = None
log.info(
'Migrating %s to Bitwarden (folder %s [%s])',
self.key, folder, folder_id,
)
data = self.to_json()
data['folderId'] = folder_id
vault.create_item(data)
def to_json(self) -> Dict[str, Any]:
data = {
'name': self.key,
'type': 1,
'login': {
'uris': [],
'password': self.password,
}
}
if self.username:
data['login']['username']= self.username
return data
def main():
logging.basicConfig(level=logging.DEBUG)
vault = Vault()
vault.sync()
vault.load()
for entry in PassEntry.list_all():
for item in vault.items:
if item['name'] == entry.key:
log.warning('Skiping %s', entry.key)
break
else:
entry.migrate(vault, 'Imported')
if __name__ == '__main__':
main()