scripts/ripcd.py

299 lines
8.3 KiB
Python
Executable File

#!/usr/bin/env python3.4
from lxml import etree
import asyncio
import aiohttp
import argparse
import fnmatch
import functools
import glob
import multiprocessing
import mutagen
import os
import re
import sys
import urllib.parse
try:
from titlecase import titlecase
except ImportError:
titlecase = str.title
class TocProtocol(asyncio.SubprocessProtocol):
def __init__(self, toc):
self.toc = toc
self.done = asyncio.Future()
def pipe_data_received(self, fd, data):
for line in data.decode().splitlines():
if not line:
continue
if line.startswith('Album title:'):
m = self.toc.ALBUM_TITLE_RE.match(line)
if not m:
continue
values = m.groupdict()
self.toc.album = values.get('album')
self.toc.artist = values.get('artist')
elif line.startswith('Track'):
try:
title = line.split(':', 1)[1].strip().strip("'")
except ValueError:
continue
self.toc.tracks.append(title)
def process_exited(self):
self.done.set_result(True)
class TableOfContents(object):
ALBUM_TITLE_RE = re.compile(
r'''^Album title: '(?P<album>.*)'\s+\[from (?P<artist>.*)\]\s*$'''
)
def __init__(self):
self.artist = None
self.album = None
self.tracks = []
@classmethod
@asyncio.coroutine
def from_device(cls, device=None):
cmd = ['icedax']
if device:
cmd.extend(('--device', device))
cmd.extend((
'--info-only',
'--no-infofile',
'--verbose-level', 'titles',
'--quiet',
'--silent-scsi',
))
env = os.environ.copy()
env['LC_MESSAGES'] = env['LANG'] = 'C'
self = cls()
loop = asyncio.get_event_loop()
factory = functools.partial(TocProtocol, self)
trans, proto = yield from loop.subprocess_exec(factory, *cmd, env=env)
yield from proto.done
trans.close()
return self
class Track(object):
CDTEXT_TAGS = {
'Albumperformer': 'albumartist',
'Albumtitle': 'album',
'Performer': 'artist',
'Tracknumber': 'tracknumber',
'Tracktitle': 'title',
}
FILENAME_FORMAT = '{tracknumber:0>2} {artist} - {title}.ogg'
def __init__(self):
self.filename = None
self.tags = {}
@property
def outfile(self):
if self.tags:
return self.FILENAME_FORMAT.format(**self.tags)
else:
return os.path.splitext(self.filename)[0] + '.ogg'
@classmethod
def from_file(cls, filename):
self = cls()
self.filename = filename
return self
def _parse_inf(self):
assert self.filename
basename = os.path.splitext(self.filename)[0]
infname = '{}.inf'.format(basename)
try:
inf = open(infname)
except OSError as e:
sys.stderr.write('Could not read track info: {}\n'.format(e))
return
with inf:
for line in inf:
line = line.split('#')[0]
if not line:
continue
try:
key, value = line.split('=')
except ValueError:
continue
try:
tag = self.CDTEXT_TAGS[key.strip()]
except KeyError:
continue
self.tags[tag] = titlecase(value.strip().strip("'"))
@asyncio.coroutine
def to_vorbis(self, lock=None):
assert self.filename
loop = asyncio.get_event_loop()
yield from loop.run_in_executor(None, self._parse_inf)
if lock:
yield from lock.acquire()
print('Encoding {} as {}'.format(self.filename, self.outfile))
cmd = ['oggenc', '-q', '9', '-Q', '-o', self.outfile, self.filename]
p = yield from asyncio.create_subprocess_exec(*cmd)
yield from p.wait()
if p.returncode != 0:
sys.stderr.write('Failed to encode {}\n'.format(self.filename))
if lock:
lock.release()
if self.tags:
yield from loop.run_in_executor(None, self.write_tags)
def write_tags(self):
tags = mutagen.File(self.outfile, easy=True)
tags.update(self.tags)
tags.save()
@asyncio.coroutine
def fetch_album_art():
loop = asyncio.get_event_loop()
try:
with open('audio.cdindex') as f:
tree = yield from loop.run_in_executor(None, etree.parse, f)
except OSError as e:
sys.stderr.write('Could not read CD index: {}\n'.format(e))
return
try:
discid = tree.xpath('//DiskId/Id')[0].text
except IndexError:
sys.stderr.write('Missing disc ID in CD index\n')
return
headers = {
'Accept': 'application/json',
}
url = 'http://musicbrainz.org/ws/2/discid/{}'.format(discid)
res = yield from aiohttp.request('GET', url, headers=headers)
metadata = yield from res.json()
res.close()
for release in metadata.get('releases', ()):
if 'cover-art-archive' not in release:
continue
if not release['cover-art-archive'].get('count', 0):
continue
break
else:
sys.stderr.write('No cover artwork available\n')
return
url = 'http://coverartarchive.org/release/{}/front'.format(
release['id'])
res = yield from aiohttp.request('GET', url)
try:
with open('folder.jpg', 'wb') as f:
while True:
data = yield from res.content.read(8192)
if not data:
break
f.write(data)
except OSError as e:
sys.stderr.write('Could not save album art: {}'.format(e))
finally:
res.close()
@asyncio.coroutine
def rip_info(device):
cmd = ['icedax']
if device:
cmd.extend(('--device', device))
cmd.extend((
'--info-only',
'--quiet',
'--silent-scsi',
))
p = yield from asyncio.create_subprocess_exec(*cmd)
yield from p.wait()
if p.returncode != 0:
sys.stderr.write('Failed to rip CD info\n')
@asyncio.coroutine
def rip_tracks(device, num_encoders=None):
if not num_encoders:
num_encoders = multiprocessing.cpu_count()
cmd = ['icedax']
if device:
cmd.extend(('--device', device))
cmd.extend((
'--alltracks',
'--no-infofile',
'--verbose-level', 'summary',
'--silent-scsi',
))
p = yield from asyncio.create_subprocess_exec(*cmd)
yield from p.wait()
if p.returncode != 0:
sys.stderr.write('Failed to rip CD tracks\n')
return
lock = asyncio.Semaphore(num_encoders)
tasks = []
for filename in glob.glob('*.wav'):
tasks.append(Track.from_file(filename).to_vorbis(lock))
yield from asyncio.wait(tasks)
def cleanup():
for f in os.listdir():
delete = False
if fnmatch.fnmatch(f, '*.wav'):
delete = True
elif fnmatch.fnmatch(f, '*.inf'):
delete = True
elif f in ('audio.cddb', 'audio.cdindex'):
delete = True
if delete:
os.unlink(f)
@asyncio.coroutine
def rip_cd(args):
loop = asyncio.get_event_loop()
toc = yield from TableOfContents.from_device(args.device)
print('Found CD: {} by {}'.format(toc.album, toc.artist))
yield from rip_info(args.device)
tasks = [
loop.create_task(fetch_album_art()),
loop.create_task(rip_tracks(args.device, args.num_encoders)),
]
yield from asyncio.wait(tasks)
if args.cleanup:
cleanup()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--num-encoders', metavar='COUNT',
help='Number of simultaneous encoder processes')
parser.add_argument('--no-clean', dest='cleanup', action='store_false',
default=True,
help='Do not remove temporary files')
parser.add_argument('device', nargs='?',
help='CD-ROM device to use')
return parser.parse_args()
def main():
args = parse_args()
loop = asyncio.get_event_loop()
loop.run_until_complete(rip_cd(args))
loop.close()
if __name__ == '__main__':
main()