341 linhas
9.7 KiB
Python
341 linhas
9.7 KiB
Python
__version_info__ = (1, 1, 1)
|
|
__version__ = '1.1.1'
|
|
|
|
import json
|
|
from collections import defaultdict
|
|
|
|
import requests
|
|
|
|
import rb3compat
|
|
|
|
api_uri = 'http://api.discogs.com'
|
|
user_agent = None
|
|
|
|
|
|
class APIBase(object):
|
|
def __init__(self):
|
|
self._cached_response = None
|
|
self._params = {}
|
|
self._headers = {'accept-encoding': 'gzip, deflate'}
|
|
|
|
def __str__(self):
|
|
return '<%s "%s">' % (self.__class__.__name__, self._id)
|
|
|
|
def __repr__(self):
|
|
return self.__str__().encode('utf-8')
|
|
|
|
def _check_user_agent(self):
|
|
if 'user_agent' in globals() and user_agent is not None:
|
|
self._headers['user-agent'] = user_agent
|
|
return 'user-agent' in self._headers and self._headers.get('user-agent')
|
|
|
|
def _clear_cache(self):
|
|
self._cached_response = None
|
|
|
|
@property
|
|
def _response(self):
|
|
if not self._cached_response:
|
|
if not self._check_user_agent():
|
|
raise DiscogsAPIError('Invalid or no User-Agent set')
|
|
try:
|
|
# gs = GSetting()
|
|
# setting = gs.get_setting(gs.Path.PLUGIN)
|
|
# type_val = setting[gs.PluginKey.PROXY_TYPE]
|
|
# if type_val == 0:
|
|
# type_name = 'http'
|
|
# elif type_val == 1:
|
|
# type_name = 'https'
|
|
# elif type_val == 2:
|
|
# type_name = 'ftp'
|
|
|
|
# proxy_name = setting[gs.PluginKey.PROXY_VALUE]
|
|
proxydict = {'http': ''}
|
|
# proxydict[type_name] = proxy_name
|
|
self._cached_response = requests.get(self._uri, params=self._params, headers=self._headers,
|
|
proxies=proxydict)
|
|
except:
|
|
raise DiscogsAPIError('bad response')
|
|
|
|
return self._cached_response
|
|
|
|
@property
|
|
def _uri_name(self):
|
|
return self.__class__.__name__.lower()
|
|
|
|
@property
|
|
def _uri(self):
|
|
return '%s/%s/%s' % (api_uri, self._uri_name, rb3compat.quote(rb3compat.unicodeencode(self._id, 'utf-8')))
|
|
# return '%s/%s/%s' % (api_uri, self._uri_name, urllib.quote(unicode(self._id).encode('utf-8')))
|
|
|
|
@property
|
|
def data(self):
|
|
if self._response.content and self._response.status_code == 200:
|
|
release_json = json.loads(self._response.content.decode("utf-8"))
|
|
return release_json.get('resp').get(self._uri_name)
|
|
else:
|
|
status_code = self._response.status_code
|
|
raise DiscogsAPIError('%s %s' % (status_code, rb3compat.responses()[status_code]))
|
|
|
|
|
|
class DiscogsAPIError(BaseException):
|
|
pass
|
|
|
|
|
|
def _parse_credits(extraartists):
|
|
"""
|
|
Parse release and track level credits
|
|
"""
|
|
_credits = defaultdict(list)
|
|
for artist in extraartists:
|
|
role = artist.get('role')
|
|
tracks = artist.get('tracks')
|
|
|
|
artist_dict = {'artists': Artist(artist['name'], anv=artist.get('anv'))}
|
|
|
|
if tracks:
|
|
artist_dict['tracks'] = tracks
|
|
|
|
_credits[role].append(artist_dict)
|
|
return _credits
|
|
|
|
|
|
def _class_from_string(api_string):
|
|
class_map = {
|
|
'master': MasterRelease,
|
|
'release': Release,
|
|
'artist': Artist,
|
|
'label': Label
|
|
}
|
|
|
|
return class_map[api_string]
|
|
|
|
|
|
class Artist(APIBase):
|
|
def __init__(self, name, anv=None):
|
|
self._id = name
|
|
self._aliases = []
|
|
self._namevariations = []
|
|
self._releases = []
|
|
self._anv = anv or None
|
|
APIBase.__init__(self)
|
|
|
|
def __str__(self):
|
|
return '<%s "%s">' % (self.__class__.__name__, self._anv + '*' if self._anv else self._id)
|
|
|
|
@property
|
|
def name(self):
|
|
return self._id
|
|
|
|
@property
|
|
def anv(self):
|
|
return self._anv
|
|
|
|
@property
|
|
def aliases(self):
|
|
if not self._aliases:
|
|
for alias in self.data.get('aliases', []):
|
|
self._aliases.append(Artist(alias))
|
|
return self._aliases
|
|
|
|
@property
|
|
def releases(self):
|
|
# TODO: Implement fetch many release IDs
|
|
# return [Release(r.get('id') for r in self.data.get('releases')]
|
|
if not self._releases:
|
|
self._params.update({'releases': '1'})
|
|
self._clear_cache()
|
|
|
|
for r in self.data.get('releases', []):
|
|
self._releases.append(_class_from_string(r['type'])(r['id']))
|
|
return self._releases
|
|
|
|
|
|
class Release(APIBase):
|
|
def __init__(self, id):
|
|
self._id = id
|
|
self._artists = []
|
|
self._master = None
|
|
self._labels = []
|
|
self._credits = None
|
|
self._tracklist = []
|
|
APIBase.__init__(self)
|
|
|
|
@property
|
|
def artists(self):
|
|
if not self._artists:
|
|
self._artists = [Artist(a['name']) for a in self.data.get('artists', [])]
|
|
return self._artists
|
|
|
|
@property
|
|
def master(self):
|
|
if not self._master and self.data.get('master_id'):
|
|
self._master = MasterRelease(self.data.get('master_id'))
|
|
return self._master
|
|
|
|
@property
|
|
def labels(self):
|
|
if not self._labels:
|
|
self._labels = [Label(l['name']) for l in self.data.get('labels', [])]
|
|
return self._labels
|
|
|
|
@property
|
|
def credits(self):
|
|
if not self._credits:
|
|
self._credits = _parse_credits(self.data.get('extraartists', []))
|
|
return self._credits
|
|
|
|
@property
|
|
def tracklist(self):
|
|
if not self._tracklist:
|
|
for track in self.data.get('tracklist', []):
|
|
artists = []
|
|
track['extraartists'] = _parse_credits(track.get('extraartists', []))
|
|
|
|
for artist in track.get('artists', []):
|
|
artists.append(Artist(artist['name'], anv=artist.get('anv')))
|
|
|
|
if artist['join']:
|
|
artists.append(artist['join'])
|
|
track['artists'] = artists
|
|
track['type'] = 'Track' if track['position'] else 'Index Track'
|
|
|
|
self._tracklist.append(track)
|
|
return self._tracklist
|
|
|
|
@property
|
|
def title(self):
|
|
return self.data.get('title')
|
|
|
|
|
|
class MasterRelease(APIBase):
|
|
def __init__(self, id):
|
|
self._id = id
|
|
self._key_release = None
|
|
self._versions = []
|
|
self._artists = []
|
|
APIBase.__init__(self)
|
|
|
|
# Override class name introspection in BaseAPI since it would otherwise return "masterrelease"
|
|
@property
|
|
def _uri_name(self):
|
|
return 'master'
|
|
|
|
@property
|
|
def key_release(self):
|
|
if not self._key_release:
|
|
self._key_release = Release(self.data.get('main_release'))
|
|
return self._key_release
|
|
|
|
@property
|
|
def title(self):
|
|
return self.key_release.data.get('title')
|
|
|
|
@property
|
|
def versions(self):
|
|
if not self._versions:
|
|
for version in self.data.get('versions', []):
|
|
self._versions.append(Release(version.get('id')))
|
|
return self._versions
|
|
|
|
@property
|
|
def artists(self):
|
|
if not self._artists:
|
|
for artist in self.data.get('artists', []):
|
|
self._artists.append(Artist(artist.get('name')))
|
|
return self._artists
|
|
|
|
@property
|
|
def tracklist(self):
|
|
return self.key_release.tracklist
|
|
|
|
|
|
class Label(APIBase):
|
|
def __init__(self, name):
|
|
self._id = name
|
|
self._sublabels = []
|
|
self._parent_label = None
|
|
APIBase.__init__(self)
|
|
|
|
@property
|
|
def sublabels(self):
|
|
if not self._sublabels:
|
|
for sublabel in self.data.get('sublabels', []):
|
|
self._sublabels.append(Label(sublabel))
|
|
return self._sublabels
|
|
|
|
@property
|
|
def parent_label(self):
|
|
if not self._parent_label and self.data.get('parentLabel'):
|
|
self._parent_label = Label(self.data.get('parentLabel'))
|
|
return self._parent_label
|
|
|
|
@property
|
|
def releases(self):
|
|
self._params.update({'releases': '1'})
|
|
self._clear_cache()
|
|
return self.data.get('releases')
|
|
|
|
|
|
class Search(APIBase):
|
|
def __init__(self, query, page=1):
|
|
self._id = query
|
|
self._results = {}
|
|
self._exactresults = []
|
|
self._page = page
|
|
APIBase.__init__(self)
|
|
self._params['q'] = self._id
|
|
self._params['page'] = self._page
|
|
|
|
def _to_object(self, result):
|
|
id = result['title']
|
|
if result['type'] in ('master', 'release'):
|
|
id = result['uri'].split('/')[-1]
|
|
elif result['type'] == 'artist':
|
|
return Artist(id, anv=result.get('anv'))
|
|
return _class_from_string(result['type'])(id)
|
|
|
|
@property
|
|
def _uri(self):
|
|
return '%s/%s' % (api_uri, self._uri_name)
|
|
|
|
@property
|
|
def exactresults(self):
|
|
if not self.data:
|
|
return []
|
|
|
|
if not self._exactresults:
|
|
for result in self.data.get('exactresults', []):
|
|
self._exactresults.append(self._to_object(result))
|
|
return self._exactresults
|
|
|
|
def results(self, page=1):
|
|
page_key = 'page%s' % page
|
|
|
|
if page != self._page:
|
|
if page > self.pages:
|
|
raise DiscogsAPIError('Page number exceeds maximum number of pages returned')
|
|
self._params['page'] = page
|
|
self._clear_cache()
|
|
|
|
if not self.data:
|
|
return []
|
|
|
|
if page_key not in self._results:
|
|
self._results[page_key] = []
|
|
for result in self.data['searchresults']['results']:
|
|
self._results[page_key].append(self._to_object(result))
|
|
|
|
return self._results[page_key]
|
|
|
|
@property
|
|
def numresults(self):
|
|
if not self.data:
|
|
return 0
|
|
return int(self.data['searchresults'].get('numResults', 0))
|
|
|
|
@property
|
|
def pages(self):
|
|
if not self.data:
|
|
return 0
|
|
return (self.numresults / 20) + 1
|