Arquivos
coverart-search-providers/discogs_client.py
T
fossfreedom 00dee49e58 PEP8 cleanup
2015-10-04 23:20:55 +01:00

341 linhas
9.7 KiB
Python

__version_info__ = (1, 1, 1)
__version__ = '1.1.1'
import json
from collections import defaultdict
import requests
import rb3compat
api_uri = 'http://api.discogs.com'
user_agent = None
class APIBase(object):
def __init__(self):
self._cached_response = None
self._params = {}
self._headers = {'accept-encoding': 'gzip, deflate'}
def __str__(self):
return '<%s "%s">' % (self.__class__.__name__, self._id)
def __repr__(self):
return self.__str__().encode('utf-8')
def _check_user_agent(self):
if 'user_agent' in globals() and user_agent is not None:
self._headers['user-agent'] = user_agent
return 'user-agent' in self._headers and self._headers.get('user-agent')
def _clear_cache(self):
self._cached_response = None
@property
def _response(self):
if not self._cached_response:
if not self._check_user_agent():
raise DiscogsAPIError('Invalid or no User-Agent set')
try:
# gs = GSetting()
# setting = gs.get_setting(gs.Path.PLUGIN)
# type_val = setting[gs.PluginKey.PROXY_TYPE]
# if type_val == 0:
# type_name = 'http'
# elif type_val == 1:
# type_name = 'https'
# elif type_val == 2:
# type_name = 'ftp'
# proxy_name = setting[gs.PluginKey.PROXY_VALUE]
proxydict = {'http': ''}
# proxydict[type_name] = proxy_name
self._cached_response = requests.get(self._uri, params=self._params, headers=self._headers,
proxies=proxydict)
except:
raise DiscogsAPIError('bad response')
return self._cached_response
@property
def _uri_name(self):
return self.__class__.__name__.lower()
@property
def _uri(self):
return '%s/%s/%s' % (api_uri, self._uri_name, rb3compat.quote(rb3compat.unicodeencode(self._id, 'utf-8')))
# return '%s/%s/%s' % (api_uri, self._uri_name, urllib.quote(unicode(self._id).encode('utf-8')))
@property
def data(self):
if self._response.content and self._response.status_code == 200:
release_json = json.loads(self._response.content.decode("utf-8"))
return release_json.get('resp').get(self._uri_name)
else:
status_code = self._response.status_code
raise DiscogsAPIError('%s %s' % (status_code, rb3compat.responses()[status_code]))
class DiscogsAPIError(BaseException):
pass
def _parse_credits(extraartists):
"""
Parse release and track level credits
"""
_credits = defaultdict(list)
for artist in extraartists:
role = artist.get('role')
tracks = artist.get('tracks')
artist_dict = {'artists': Artist(artist['name'], anv=artist.get('anv'))}
if tracks:
artist_dict['tracks'] = tracks
_credits[role].append(artist_dict)
return _credits
def _class_from_string(api_string):
class_map = {
'master': MasterRelease,
'release': Release,
'artist': Artist,
'label': Label
}
return class_map[api_string]
class Artist(APIBase):
def __init__(self, name, anv=None):
self._id = name
self._aliases = []
self._namevariations = []
self._releases = []
self._anv = anv or None
APIBase.__init__(self)
def __str__(self):
return '<%s "%s">' % (self.__class__.__name__, self._anv + '*' if self._anv else self._id)
@property
def name(self):
return self._id
@property
def anv(self):
return self._anv
@property
def aliases(self):
if not self._aliases:
for alias in self.data.get('aliases', []):
self._aliases.append(Artist(alias))
return self._aliases
@property
def releases(self):
# TODO: Implement fetch many release IDs
# return [Release(r.get('id') for r in self.data.get('releases')]
if not self._releases:
self._params.update({'releases': '1'})
self._clear_cache()
for r in self.data.get('releases', []):
self._releases.append(_class_from_string(r['type'])(r['id']))
return self._releases
class Release(APIBase):
def __init__(self, id):
self._id = id
self._artists = []
self._master = None
self._labels = []
self._credits = None
self._tracklist = []
APIBase.__init__(self)
@property
def artists(self):
if not self._artists:
self._artists = [Artist(a['name']) for a in self.data.get('artists', [])]
return self._artists
@property
def master(self):
if not self._master and self.data.get('master_id'):
self._master = MasterRelease(self.data.get('master_id'))
return self._master
@property
def labels(self):
if not self._labels:
self._labels = [Label(l['name']) for l in self.data.get('labels', [])]
return self._labels
@property
def credits(self):
if not self._credits:
self._credits = _parse_credits(self.data.get('extraartists', []))
return self._credits
@property
def tracklist(self):
if not self._tracklist:
for track in self.data.get('tracklist', []):
artists = []
track['extraartists'] = _parse_credits(track.get('extraartists', []))
for artist in track.get('artists', []):
artists.append(Artist(artist['name'], anv=artist.get('anv')))
if artist['join']:
artists.append(artist['join'])
track['artists'] = artists
track['type'] = 'Track' if track['position'] else 'Index Track'
self._tracklist.append(track)
return self._tracklist
@property
def title(self):
return self.data.get('title')
class MasterRelease(APIBase):
def __init__(self, id):
self._id = id
self._key_release = None
self._versions = []
self._artists = []
APIBase.__init__(self)
# Override class name introspection in BaseAPI since it would otherwise return "masterrelease"
@property
def _uri_name(self):
return 'master'
@property
def key_release(self):
if not self._key_release:
self._key_release = Release(self.data.get('main_release'))
return self._key_release
@property
def title(self):
return self.key_release.data.get('title')
@property
def versions(self):
if not self._versions:
for version in self.data.get('versions', []):
self._versions.append(Release(version.get('id')))
return self._versions
@property
def artists(self):
if not self._artists:
for artist in self.data.get('artists', []):
self._artists.append(Artist(artist.get('name')))
return self._artists
@property
def tracklist(self):
return self.key_release.tracklist
class Label(APIBase):
def __init__(self, name):
self._id = name
self._sublabels = []
self._parent_label = None
APIBase.__init__(self)
@property
def sublabels(self):
if not self._sublabels:
for sublabel in self.data.get('sublabels', []):
self._sublabels.append(Label(sublabel))
return self._sublabels
@property
def parent_label(self):
if not self._parent_label and self.data.get('parentLabel'):
self._parent_label = Label(self.data.get('parentLabel'))
return self._parent_label
@property
def releases(self):
self._params.update({'releases': '1'})
self._clear_cache()
return self.data.get('releases')
class Search(APIBase):
def __init__(self, query, page=1):
self._id = query
self._results = {}
self._exactresults = []
self._page = page
APIBase.__init__(self)
self._params['q'] = self._id
self._params['page'] = self._page
def _to_object(self, result):
id = result['title']
if result['type'] in ('master', 'release'):
id = result['uri'].split('/')[-1]
elif result['type'] == 'artist':
return Artist(id, anv=result.get('anv'))
return _class_from_string(result['type'])(id)
@property
def _uri(self):
return '%s/%s' % (api_uri, self._uri_name)
@property
def exactresults(self):
if not self.data:
return []
if not self._exactresults:
for result in self.data.get('exactresults', []):
self._exactresults.append(self._to_object(result))
return self._exactresults
def results(self, page=1):
page_key = 'page%s' % page
if page != self._page:
if page > self.pages:
raise DiscogsAPIError('Page number exceeds maximum number of pages returned')
self._params['page'] = page
self._clear_cache()
if not self.data:
return []
if page_key not in self._results:
self._results[page_key] = []
for result in self.data['searchresults']['results']:
self._results[page_key].append(self._to_object(result))
return self._results[page_key]
@property
def numresults(self):
if not self.data:
return 0
return int(self.data['searchresults'].get('numResults', 0))
@property
def pages(self):
if not self.data:
return 0
return (self.numresults / 20) + 1