Moved deezer api over to deezer-py pipy package; Version bump to 2.0.1

This commit is contained in:
RemixDev 2020-11-19 22:08:35 +01:00
parent 8840855a96
commit 94f3bc95c3
No known key found for this signature in database
GPG Key ID: B33962B465BDB51C
13 changed files with 134 additions and 918 deletions

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python3
__version__ = "1.5.21"
__version__ = "2.0.1"
USER_AGENT_HEADER = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/79.0.3945.130 Safari/537.36"

View File

@ -1,855 +0,0 @@
import eventlet
import binascii
import datetime
import time
requests = eventlet.import_patched('requests')
from Cryptodome.Cipher import Blowfish, AES
from Cryptodome.Hash import MD5
from Cryptodome.Util.Padding import pad
import re
import json
USER_AGENT_HEADER = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/79.0.3945.130 Safari/537.36"
class LyricsStatus():
"""Explicit Content Lyrics"""
NOT_EXPLICIT = 0
"""Not Explicit"""
EXPLICIT = 1
"""Explicit"""
UNKNOWN = 2
"""Unknown"""
EDITED = 3
"""Edited"""
PARTIALLY_EXPLICIT = 4
"""Partially Explicit (Album "lyrics" only)"""
PARTIALLY_UNKNOWN = 5
"""Partially Unknown (Album "lyrics" only)"""
NO_ADVICE = 6
"""No Advice Available"""
PARTIALLY_NO_ADVICE = 7
"""Partially No Advice Available (Album "lyrics" only)"""
class TrackFormats():
"""Number associtation for formats"""
FLAC = 9
MP3_320 = 3
MP3_128 = 1
MP4_RA3 = 15
MP4_RA2 = 14
MP4_RA1 = 13
DEFAULT = 8
LOCAL = 0
class Deezer:
def __init__(self):
self.api_url = "http://www.deezer.com/ajax/gw-light.php"
self.legacy_api_url = "https://api.deezer.com/"
self.http_headers = {
"User-Agent": USER_AGENT_HEADER,
"Accept-Language": None
}
self.album_pictures_host = "https://e-cdns-images.dzcdn.net/images/cover/"
self.artist_pictures_host = "https://e-cdns-images.dzcdn.net/images/artist/"
self.user = {}
self.family = False
self.childs = []
self.selectedAccount = 0
self.favorites = {
'songs': [],
'albums': [],
'artists': [],
'playlists': []
}
self.checksums = None
self.session = requests.Session()
self.mobile_session = requests.Session()
self.logged_in = False
self.session.mount('http://', requests.adapters.HTTPAdapter(pool_maxsize=100))
self.session.mount('https://', requests.adapters.HTTPAdapter(pool_maxsize=100))
def set_accept_language(self, lang):
self.http_headers['Accept-Language'] = lang
def get_accept_language(self):
return self.http_headers['Accept-Language']
def get_token(self):
token_data = self.gw_api_call('deezer.getUserData')
return token_data["results"]["checkForm"]
def get_track_filesizes(self, sng_id):
try:
response = self.mobile_session.post("https://www.deezer.com/",
headers=self.http_headers,
timeout=30)
guest_sid = self.mobile_session.cookies.get('sid')
site = self.mobile_session.post(
"https://api.deezer.com/1.0/gateway.php",
params={
'api_key': "4VCYIJUCDLOUELGD1V8WBVYBNVDYOXEWSLLZDONGBBDFVXTZJRXPR29JRLQFO6ZE",
'sid': guest_sid,
'input': '3',
'output': '3',
'method': 'song_getData'
},
timeout=30,
json={'sng_id': sng_id},
headers=self.http_headers
)
result_json = site.json()
except:
eventlet.sleep(2)
return self.get_track_filesizes(sng_id)
if len(result_json['error']):
raise APIError(json.dumps(result_json['error']))
response = result_json.get("results")
filesizes = {}
for key, value in response.items():
if key.startswith("FILESIZE_"):
filesizes[key] = value
filesizes[key+"_TESTED"] = False
return filesizes
def gw_api_call(self, method, args=None, params=None):
if args is None:
args = {}
if params is None:
params = {}
p = {'api_version': "1.0",
'api_token': 'null' if method == 'deezer.getUserData' else self.get_token(),
'input': '3',
'method': method}
p.update(params)
try:
result = self.session.post(
self.api_url,
params=p,
timeout=30,
json=args,
headers=self.http_headers
)
result_json = result.json()
except:
eventlet.sleep(2)
return self.gw_api_call(method, args, params)
if len(result_json['error']):
raise APIError(json.dumps(result_json['error']))
return result.json()
def api_call(self, method, args=None):
if args is None:
args = {}
try:
result = self.session.get(
self.legacy_api_url + method,
params=args,
headers=self.http_headers,
timeout=30
)
result_json = result.json()
except:
eventlet.sleep(2)
return self.api_call(method, args)
if 'error' in result_json.keys():
if 'code' in result_json['error'] and result_json['error']['code'] == 4:
eventlet.sleep(5)
return self.api_call(method, args)
raise APIError(json.dumps(result_json['error']))
return result_json
def login(self, email, password, re_captcha_token, child=0):
check_form_login = self.gw_api_call("deezer.getUserData")
login = self.session.post(
"https://www.deezer.com/ajax/action.php",
data={
'type': 'login',
'mail': email,
'password': password,
'checkFormLogin': check_form_login['results']['checkFormLogin'],
'reCaptchaToken': re_captcha_token
},
headers={'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', **self.http_headers}
)
if 'success' not in login.text:
self.logged_in = False
return False
user_data = self.gw_api_call("deezer.getUserData")
self.family = user_data["results"]["USER"]["MULTI_ACCOUNT"]["ENABLED"]
if self.family:
self.childs = self.get_child_accounts_gw()
if len(self.childs)-1 >= child:
self.user = {
'id': self.childs[child]["USER_ID"],
'name': self.childs[child]["BLOG_NAME"],
'picture': self.childs[child]["USER_PICTURE"] if "USER_PICTURE" in self.childs[child] else ""
}
self.selectedAccount = child
else:
self.user = {
'id': user_data["results"]["USER"]["USER_ID"],
'name': user_data["results"]["USER"]["BLOG_NAME"],
'picture': user_data["results"]["USER"]["USER_PICTURE"] if "USER_PICTURE" in user_data["results"][
"USER"] else ""
}
self.selectedAccount = 0
else:
self.user = {
'id': user_data["results"]["USER"]["USER_ID"],
'name': user_data["results"]["USER"]["BLOG_NAME"],
'picture': user_data["results"]["USER"]["USER_PICTURE"] if "USER_PICTURE" in user_data["results"][
"USER"] else ""
}
self.logged_in = True
return True
def login_via_arl(self, arl, child=0):
arl = arl.strip()
cookie_obj = requests.cookies.create_cookie(
domain='.deezer.com',
name='arl',
value=arl,
path="/",
rest={'HttpOnly': True}
)
self.session.cookies.set_cookie(cookie_obj)
user_data = self.gw_api_call("deezer.getUserData")
if user_data["results"]["USER"]["USER_ID"] == 0:
self.logged_in = False
return False
self.family = user_data["results"]["USER"]["MULTI_ACCOUNT"]["ENABLED"]
if self.family:
self.childs = self.get_child_accounts_gw()
if len(self.childs)-1 >= child:
self.user = {
'id': self.childs[child]["USER_ID"],
'name': self.childs[child]["BLOG_NAME"],
'picture': self.childs[child]["USER_PICTURE"] if "USER_PICTURE" in self.childs[child] else ""
}
self.selectedAccount = child
else:
self.user = {
'id': user_data["results"]["USER"]["USER_ID"],
'name': user_data["results"]["USER"]["BLOG_NAME"],
'picture': user_data["results"]["USER"]["USER_PICTURE"] if "USER_PICTURE" in user_data["results"][
"USER"] else ""
}
self.selectedAccount = 0
else:
self.user = {
'id': user_data["results"]["USER"]["USER_ID"],
'name': user_data["results"]["USER"]["BLOG_NAME"],
'picture': user_data["results"]["USER"]["USER_PICTURE"] if "USER_PICTURE" in user_data["results"][
"USER"] else ""
}
self.logged_in = True
return True
def change_account(self, child):
if len(self.childs)-1 >= child:
self.user = {
'id': self.childs[child]["USER_ID"],
'name': self.childs[child]["BLOG_NAME"],
'picture': self.childs[child]["USER_PICTURE"] if "USER_PICTURE" in self.childs[child] else ""
}
self.selectedAccount = child
return (self.user, self.selectedAccount)
def get_child_accounts_gw(self):
return self.gw_api_call('deezer.getChildAccounts')['results']
def get_track_gw(self, sng_id):
body = None
if int(sng_id) > 0:
try:
body = self.gw_api_call('deezer.pageTrack', {'sng_id': sng_id})
except:
body = None
if body:
if 'LYRICS' in body['results']:
body['results']['DATA']['LYRICS'] = body['results']['LYRICS']
body['results'] = body['results']['DATA']
else:
body = self.gw_api_call('song.getData', {'sng_id': sng_id})
return body['results']
def get_tracks_gw(self, ids):
tracks_array = []
body = self.gw_api_call('song.getListData', {'sng_ids': ids})
errors = 0
for i in range(len(ids)):
if ids[i] != 0:
tracks_array.append(body['results']['data'][i - errors])
else:
errors += 1
tracks_array.append({
'SNG_ID': 0,
'SNG_TITLE': '',
'DURATION': 0,
'MD5_ORIGIN': 0,
'MEDIA_VERSION': 0,
'FILESIZE': 0,
'ALB_TITLE': "",
'ALB_PICTURE': "",
'ART_ID': 0,
'ART_NAME': ""
})
return tracks_array
def get_album_gw(self, alb_id):
return self.gw_api_call('album.getData', {'alb_id': alb_id})['results']
def get_album_details_gw(self, alb_id):
result = self.gw_api_call('deezer.pageAlbum',
{
'alb_id': alb_id,
'lang': 'en',
'header': True,
'tab': 0
})['results']
output = result['DATA']
duration = 0
for x in result['SONGS']['data']:
try:
duration += int(x['DURATION'])
except:
pass
output['DURATION'] = duration
output['NUMBER_TRACK'] = result['SONGS']['total']
output['LINK'] = f"https://deezer.com/album/{str(output['ALB_ID'])}"
return output
def get_album_tracks_gw(self, alb_id):
tracks_array = []
body = self.gw_api_call('song.getListByAlbum', {'alb_id': alb_id, 'nb': -1})
for track in body['results']['data']:
_track = track
_track['position'] = body['results']['data'].index(track)
tracks_array.append(_track)
return tracks_array
def get_artist_gw(self, art_id):
return self.gw_api_call('deezer.pageArtist', {'art_id': art_id})
def get_playlist_gw(self, playlist_id):
playlistAPI = self.gw_api_call('deezer.pagePlaylist', {'playlist_id': playlist_id, 'lang': 'en'})['results']['DATA']
return {
'id': playlistAPI['PLAYLIST_ID'],
'title': playlistAPI['TITLE'],
'description': playlistAPI['DESCRIPTION'],
'duration': playlistAPI['DURATION'],
'public': playlistAPI['STATUS'] == 1,
'is_loved_track': playlistAPI['TYPE'] == 4,
'collaborative': playlistAPI['STATUS'] == 2,
'nb_tracks': playlistAPI['NB_SONG'],
'fans': playlistAPI['NB_FAN'],
'link': "https://www.deezer.com/playlist/"+playlistAPI['PLAYLIST_ID'],
'share': "https://www.deezer.com/playlist/"+playlistAPI['PLAYLIST_ID'],
'picture': "https://api.deezer.com/playlist/"+playlistAPI['PLAYLIST_ID']+"/image",
'picture_small': "https://cdns-images.dzcdn.net/images/"+playlistAPI['PICTURE_TYPE']+"/"+playlistAPI['PLAYLIST_PICTURE']+"/56x56-000000-80-0-0.jpg",
'picture_medium': "https://cdns-images.dzcdn.net/images/"+playlistAPI['PICTURE_TYPE']+"/"+playlistAPI['PLAYLIST_PICTURE']+"/250x250-000000-80-0-0.jpg",
'picture_big': "https://cdns-images.dzcdn.net/images/"+playlistAPI['PICTURE_TYPE']+"/"+playlistAPI['PLAYLIST_PICTURE']+"/500x500-000000-80-0-0.jpg",
'picture_xl': "https://cdns-images.dzcdn.net/images/"+playlistAPI['PICTURE_TYPE']+"/"+playlistAPI['PLAYLIST_PICTURE']+"/1000x1000-000000-80-0-0.jpg",
'checksum': playlistAPI['CHECKSUM'],
'tracklist': "https://api.deezer.com/playlist/"+playlistAPI['PLAYLIST_ID']+"/tracks",
'creation_date': playlistAPI['DATE_ADD'],
'creator': {
'id': playlistAPI['PARENT_USER_ID'],
'name': playlistAPI['PARENT_USERNAME'],
'tracklist': "https://api.deezer.com/user/"+playlistAPI['PARENT_USER_ID']+"/flow",
'type': "user"
},
'type': "playlist"
}
def get_playlist_tracks_gw(self, playlist_id):
tracks_array = []
body = self.gw_api_call('playlist.getSongs', {'playlist_id': playlist_id, 'nb': -1})
for track in body['results']['data']:
track['position'] = body['results']['data'].index(track)
tracks_array.append(track)
return tracks_array
def get_artist_toptracks_gw(self, art_id):
tracks_array = []
body = self.gw_api_call('artist.getTopTrack', {'art_id': art_id, 'nb': 100})
for track in body['results']['data']:
track['position'] = body['results']['data'].index(track)
tracks_array.append(track)
return tracks_array
def get_artist_discography_gw(self, art_id, nb=100):
start = 0
releases = []
RELEASE_TYPE = {0:"single", 1:"album", 2:"compile", 3:"ep", 4:"bundle"}
result = {'all': []}
IDs = []
while True:
response = self.gw_api_call('album.getDiscography', {'art_id': art_id, "discography_mode":"all", 'nb': nb, 'nb_songs': 0, 'start': start})
releases += response['results']['data']
start += nb
if start > response['results']['total']:
break
for release in releases:
if release['ALB_ID'] not in IDs:
IDs.append(release['ALB_ID'])
obj = {
'id': release['ALB_ID'],
'title': release['ALB_TITLE'],
'link': f"https://www.deezer.com/album/{release['ALB_ID']}",
'cover': f"https://api.deezer.com/album/{release['ALB_ID']}/image",
'cover_small': f"https://cdns-images.dzcdn.net/images/cover/{release['ALB_PICTURE']}/56x56-000000-80-0-0.jpg",
'cover_medium': f"https://cdns-images.dzcdn.net/images/cover/{release['ALB_PICTURE']}/250x250-000000-80-0-0.jpg",
'cover_big': f"https://cdns-images.dzcdn.net/images/cover/{release['ALB_PICTURE']}/500x500-000000-80-0-0.jpg",
'cover_xl': f"https://cdns-images.dzcdn.net/images/cover/{release['ALB_PICTURE']}/1000x1000-000000-80-0-0.jpg",
'genre_id': release['GENRE_ID'],
'fans': release['RANK'],
'release_date': release['PHYSICAL_RELEASE_DATE'],
'record_type': RELEASE_TYPE.get(int(release['TYPE']), "unknown"),
'tracklist': f"https://api.deezer.com/album/{release['ALB_ID']}/tracks",
'explicit_lyrics': int(release['EXPLICIT_LYRICS']) > 0,
'type': release['__TYPE__'],
'nb_song': release['NUMBER_TRACK'],
'nb_disk': release['NUMBER_DISK']
}
if (release['ART_ID'] == art_id or release['ART_ID'] != art_id and release['ROLE_ID'] == 0) and release['ARTISTS_ALBUMS_IS_OFFICIAL']:
if not obj['record_type'] in result:
result[obj['record_type']] = []
result[obj['record_type']].append(obj)
result['all'].append(obj)
else:
if release['ROLE_ID'] == 5:
if not 'featured' in result:
result['featured'] = []
result['featured'].append(obj)
elif release['ROLE_ID'] == 0:
if not 'more' in result:
result['more'] = []
result['more'].append(obj)
result['all'].append(obj)
return result
def search_main_gw(self, term):
term = term
results = self.gw_api_call('deezer.pageSearch',
{"query": clean_search_query(term), "start": 0, "nb": 10, "suggest": True, "artist_suggest": True,
"top_tracks": True})['results']
order = []
for x in results['ORDER']:
if x in ['TOP_RESULT', 'TRACK', 'ALBUM', 'ARTIST', 'PLAYLIST']:
order.append(x)
if 'TOP_RESULT' in results and len(results['TOP_RESULT']):
orig_top_result = results['TOP_RESULT'][0]
top_result = {}
top_result['type'] = orig_top_result['__TYPE__']
if top_result['type'] == 'artist':
top_result['id'] = orig_top_result['ART_ID']
top_result['picture'] = 'https://e-cdns-images.dzcdn.net/images/artist/' + orig_top_result['ART_PICTURE']
top_result['title'] = orig_top_result['ART_NAME']
top_result['nb_fan'] = orig_top_result['NB_FAN']
elif top_result['type'] == 'album':
top_result['id'] = orig_top_result['ALB_ID']
top_result['picture'] = 'https://e-cdns-images.dzcdn.net/images/cover/' + orig_top_result['ALB_PICTURE']
top_result['title'] = orig_top_result['ALB_TITLE']
top_result['artist'] = orig_top_result['ART_NAME']
top_result['nb_song'] = orig_top_result['NUMBER_TRACK']
elif top_result['type'] == 'playlist':
top_result['id'] = orig_top_result['PLAYLIST_ID']
top_result['picture'] = 'https://e-cdns-images.dzcdn.net/images/' + orig_top_result['PICTURE_TYPE'] + '/' + orig_top_result['PLAYLIST_PICTURE']
top_result['title'] = orig_top_result['TITLE']
top_result['artist'] = orig_top_result['PARENT_USERNAME']
top_result['nb_song'] = orig_top_result['NB_SONG']
else:
top_result['id'] = "0"
top_result['picture'] = 'https://e-cdns-images.dzcdn.net/images/cover'
top_result['picture'] += '/156x156-000000-80-0-0.jpg'
top_result['link'] = 'https://deezer.com/'+top_result['type']+'/'+str(top_result['id'])
results['TOP_RESULT'][0] = top_result
results['ORDER'] = order
return results
def search_gw(self, term, type, start, nb=20):
return \
self.gw_api_call('search.music',
{"query": clean_search_query(term), "filter": "ALL", "output": type, "start": start, "nb": nb})[
'results']
def search_album_gw(self, term, start, nb=20):
results = self.search_gw(term, "ALBUM", start, nb)
ids = [x['ALB_ID'] for x in results['data']]
pool = eventlet.GreenPool(100)
albums = [a for a in pool.imap(self.get_album_details_gw, ids)]
return albums
def get_page_gw(self, page):
params = {
'gateway_input': json.dumps({
'PAGE': page,
'VERSION': '2.3',
'SUPPORT': {
'grid': [
'channel',
'album'
],
'horizontal-grid': [
'album'
],
},
'LANG': 'en'
})
}
return self.gw_api_call('page.get', params=params)
def get_new_releases(self):
explore = self.get_page_gw('channels/explore')
music_section = next((x for x in explore['results']['sections'] if x['title'] == 'Music'), None)
channels = [x['target'] for x in music_section['items']]
pool = eventlet.GreenPool(100)
new_releases_lists = [x for x in pool.imap(self.get_channel_new_releases, channels)]
seen = set()
new_releases = [seen.add(x['ALB_ID']) or x for list in new_releases_lists for x in list if x['ALB_ID'] not in seen]
new_releases.sort(key=lambda x: x['DIGITAL_RELEASE_DATE'], reverse=True)
now = datetime.datetime.now()
delta = datetime.timedelta(days=8)
recent_releases = [x for x in new_releases if now - datetime.datetime.strptime(x['DIGITAL_RELEASE_DATE'], "%Y-%m-%d") < delta]
recent_releases.sort(key=lambda x: x['ALB_ID'], reverse=True)
albums = [a for a in pool.imap(self.get_album_details_gw, [x['ALB_ID'] for x in recent_releases])]
return albums
def get_channel_new_releases(self, channel_name):
channel_data = self.get_page_gw(channel_name)
pattern = '^New.*releases$'
new_releases = next((x for x in channel_data['results']['sections'] if re.match(pattern, x['title'])), None)
if new_releases is not None:
show_all = self.get_page_gw(new_releases['target'])
albums = [x['data'] for x in show_all['results']['sections'][0]['items']]
return albums
return []
def get_lyrics_gw(self, sng_id):
return self.gw_api_call('song.getLyrics', {'sng_id': sng_id})["results"]
def get_user_playlists_gw(self, user_id):
data = self.gw_api_call('deezer.pageProfile', {'user_id': user_id, 'tab': 'playlists', 'nb': -1})['results']['TAB']['playlists']['data']
result = []
for playlist in data:
item = {
'id': playlist['PLAYLIST_ID'],
'title': playlist['TITLE'],
'nb_tracks': playlist['NB_SONG'],
'link': 'https://www.deezer.com/playlist/'+str(playlist['PLAYLIST_ID']),
'picture': 'https://api.deezer.com/playlist/'+str(playlist['PLAYLIST_ID'])+'/image',
'picture_small': 'https://e-cdns-images.dzcdn.net/images/'+playlist['PICTURE_TYPE']+'/'+playlist['PLAYLIST_PICTURE']+'/56x56-000000-80-0-0.jpg',
'picture_medium': 'https://e-cdns-images.dzcdn.net/images/'+playlist['PICTURE_TYPE']+'/'+playlist['PLAYLIST_PICTURE']+'/250x250-000000-80-0-0.jpg',
'picture_big': 'https://e-cdns-images.dzcdn.net/images/'+playlist['PICTURE_TYPE']+'/'+playlist['PLAYLIST_PICTURE']+'/500x500-000000-80-0-0.jpg',
'picture_xl': 'https://e-cdns-images.dzcdn.net/images/'+playlist['PICTURE_TYPE']+'/'+playlist['PLAYLIST_PICTURE']+'/1000x1000-000000-80-0-0.jpg',
'tracklist': 'https://api.deezer.com/playlist/'+str(playlist['PLAYLIST_ID'])+'/tracks',
'creator': {
'id': playlist['PARENT_USER_ID'],
'name': playlist['PARENT_USERNAME'] if 'PARENT_USERNAME' in playlist else self.user['name']
},
'type': 'playlist'
}
result.append(item)
return result
def get_user_albums_gw(self, user_id):
data = self.gw_api_call('deezer.pageProfile', {'user_id': user_id, 'tab': 'albums', 'nb': -1})['results']['TAB']['albums']['data']
result = []
for album in data:
item = {
'id': album['ALB_ID'],
'title': album['ALB_TITLE'],
'link': 'https://www.deezer.com/album/'+str(album['ALB_ID']),
'cover': 'https://api.deezer.com/album/'+str(album['ALB_ID'])+'/image',
'cover_small': 'https://e-cdns-images.dzcdn.net/images/cover/'+album['ALB_PICTURE']+'/56x56-000000-80-0-0.jpg',
'cover_medium': 'https://e-cdns-images.dzcdn.net/images/cover/'+album['ALB_PICTURE']+'/250x250-000000-80-0-0.jpg',
'cover_big': 'https://e-cdns-images.dzcdn.net/images/cover/'+album['ALB_PICTURE']+'/500x500-000000-80-0-0.jpg',
'cover_xl': 'https://e-cdns-images.dzcdn.net/images/cover/'+album['ALB_PICTURE']+'/1000x1000-000000-80-0-0.jpg',
'tracklist': 'https://api.deezer.com/album/'+str(album['ALB_ID'])+'/tracks',
'explicit_lyrics': album['EXPLICIT_ALBUM_CONTENT']['EXPLICIT_LYRICS_STATUS'] > 0,
'artist': {
'id': album['ART_ID'],
'name': album['ART_NAME'],
'picture': 'https://api.deezer.com/artist/'+str(album['ART_ID'])+'image',
'tracklist': 'https://api.deezer.com/artist/'+str(album['ART_ID'])+'/top?limit=50'
},
'type': 'album'
}
result.append(item)
return result
def get_user_artists_gw(self, user_id):
data = self.gw_api_call('deezer.pageProfile', {'user_id': user_id, 'tab': 'artists', 'nb': -1})['results']['TAB']['artists']['data']
result = []
for artist in data:
item = {
'id': artist['ART_ID'],
'name': artist['ART_NAME'],
'link': 'https://www.deezer.com/artist/'+str(artist['ART_ID']),
'picture': 'https://api.deezer.com/artist/'+str(artist['ART_ID'])+'/image',
'picture_small': 'https://e-cdns-images.dzcdn.net/images/artist/'+str(artist['ART_ID'])+'/56x56-000000-80-0-0.jpg',
'picture_medium': 'https://e-cdns-images.dzcdn.net/images/artist/'+str(artist['ART_ID'])+'/250x250-000000-80-0-0.jpg',
'picture_big': 'https://e-cdns-images.dzcdn.net/images/artist/'+str(artist['ART_ID'])+'/500x500-000000-80-0-0.jpg',
'picture_xl': 'https://e-cdns-images.dzcdn.net/images/artist/'+str(artist['ART_ID'])+'/1000x1000-000000-80-0-0.jpg',
'nb_fan': artist['NB_FAN'],
'tracklist': 'https://api.deezer.com/artist/'+str(artist['ART_ID'])+'/top?limit=50',
'type': 'artist'
}
result.append(item)
return result
def get_user_tracks_gw(self, user_id):
data = self.gw_api_call('deezer.pageProfile', {'user_id': user_id, 'tab': 'loved', 'nb': -1})['results']['TAB']['loved']['data']
result = []
for track in data:
item = {
'id': track['SNG_ID'],
'title': track['SNG_TITLE'],
'link': 'https://www.deezer.com/track/'+str(track['SNG_ID']),
'duration': track['DURATION'],
'rank': track['RANK_SNG'],
'explicit_lyrics': int(track['EXPLICIT_LYRICS']) > 0,
'explicit_content_lyrics': track['EXPLICIT_TRACK_CONTENT']['EXPLICIT_COVER_STATUS'],
'explicit_content_cover': track['EXPLICIT_TRACK_CONTENT']['EXPLICIT_LYRICS_STATUS'],
'time_add': track['DATE_ADD'],
'album': {
'id': track['ALB_ID'],
'title': track['ALB_TITLE'],
'cover': 'https://api.deezer.com/album/'+str(track['ALB_ID'])+'/image',
'cover_small': 'https://e-cdns-images.dzcdn.net/images/cover/'+str(track['ALB_PICTURE'])+'/56x56-000000-80-0-0.jpg',
'cover_medium': 'https://e-cdns-images.dzcdn.net/images/cover/'+str(track['ALB_PICTURE'])+'/250x250-000000-80-0-0.jpg',
'cover_big': 'https://e-cdns-images.dzcdn.net/images/cover/'+str(track['ALB_PICTURE'])+'/500x500-000000-80-0-0.jpg',
'cover_xl': 'https://e-cdns-images.dzcdn.net/images/cover/'+str(track['ALB_PICTURE'])+'/1000x1000-000000-80-0-0.jpg',
'tracklist': 'https://api.deezer.com/album/'+str(track['ALB_ID'])+'/tracks',
'type': 'album'
},
'artist': {
'id': track['ART_ID'],
'name': track['ART_NAME'],
'picture': 'https://api.deezer.com/artist/'+str(track['ART_ID'])+'/image',
'picture_small': 'https://e-cdns-images.dzcdn.net/images/artist/'+str(track['ART_PICTURE'])+'/56x56-000000-80-0-0.jpg',
'picture_medium': 'https://e-cdns-images.dzcdn.net/images/artist/'+str(track['ART_PICTURE'])+'/250x250-000000-80-0-0.jpg',
'picture_big': 'https://e-cdns-images.dzcdn.net/images/artist/'+str(track['ART_PICTURE'])+'/500x500-000000-80-0-0.jpg',
'picture_xl': 'https://e-cdns-images.dzcdn.net/images/artist/'+str(track['ART_PICTURE'])+'/1000x1000-000000-80-0-0.jpg',
'tracklist': 'https://api.deezer.com/artist/'+str(track['ART_ID'])+'/top?limit=50',
'type': 'artist'
},
'type': 'track'
}
result.append(item)
return result
def refresh_user_favorites(self):
result = self.gw_api_call('user.getAllFeedbacks', {'checksums': self.checksums})['results']
checksums = self.checksums or {'DISLIKES': {}, 'FAVORITES': {}}
idsName = {
'SONGS': 'SNG_ID',
'ALBUMS': 'ALB_ID',
'ARTISTS': 'ART_ID',
'PLAYLISTS': 'PLAYLIST_ID'
}
for category in ['DISLIKES', 'FAVORITES']:
for section in result[category]:
if result[category][section] != "Not modified":
checksums[section] = result[category][section]['checksum']
if category == 'FAVORITES' and section.lower() in self.favorites:
self.favorites[section.lower()] = []
for release in result[category][section]['data']:
self.favorites[section.lower()].append(release[idsName[section]])
self.checksums = checksums
def add_to_favorites(self, type, id):
if type == 'track' and str(id) not in self.favorites['songs']:
self.gw_api_call('favorite_song.add', {'SNG_ID': str(id)})
self.favorites['songs'].append(str(id))
elif type == 'album' and str(id) not in self.favorites['albums']:
self.gw_api_call('album.addFavorite', {'ALB_ID': str(id)})
self.favorites['albums'].append(str(id))
elif type == 'artist' and str(id) not in self.favorites['artists']:
self.gw_api_call('artist.addFavorite', {'ART_ID': str(id)})
self.favorites['artists'].append(str(id))
elif type == 'playlist' and str(id) not in self.favorites['playlists']:
self.gw_api_call('playlist.addFavorite', {'PARENT_PLAYLIST_ID': str(id)})
self.favorites['playlists'].append(str(id))
def remove_from_favorites(self, type, id):
if type == 'track' and str(id) in self.favorites['songs']:
self.gw_api_call('favorite_song.remove', {'SNG_ID': str(id)})
self.favorites['songs'].remove(str(id))
elif type == 'album' and str(id) in self.favorites['albums']:
self.gw_api_call('album.deleteFavorite', {'ALB_ID': str(id)})
self.favorites['albums'].remove(str(id))
elif type == 'artist' and str(id) in self.favorites['artists']:
self.gw_api_call('artist.deleteFavorite', {'ART_ID': str(id)})
self.favorites['artists'].remove(str(id))
elif type == 'playlist' and str(id) in self.favorites['playlists']:
self.gw_api_call('playlist.deleteFavorite', {'PLAYLIST_ID': str(id)})
self.favorites['playlists'].remove(str(id))
def get_user_playlists(self, user_id):
return self.api_call('user/' + str(user_id) + '/playlists', {'limit': -1})
def get_user_albums(self, user_id):
return self.api_call('user/' + str(user_id) + '/albums', {'limit': -1})
def get_user_artists(self, user_id):
return self.api_call('user/' + str(user_id) + '/artists', {'limit': -1})
def get_user_tracks(self, user_id):
return self.api_call('user/' + str(user_id) + '/tracks', {'limit': -1})
def get_track(self, sng_id):
return self.api_call('track/' + str(sng_id))
def get_track_by_ISRC(self, isrc):
return self.api_call('track/isrc:' + isrc)
def get_charts_countries(self):
temp = self.get_user_playlists('637006841')['data']
result = sorted(temp, key=lambda k: k['title'])
if not result[0]['title'].startswith('Top'):
result = result[1:]
return result
def get_charts(self, limit=30):
return self.api_call('chart', {'limit': limit})
def get_playlist(self, playlist_id):
return self.api_call('playlist/' + str(playlist_id))
def get_playlist_tracks(self, playlist_id):
return self.api_call('playlist/' + str(playlist_id) + '/tracks', {'limit': -1})
def get_album(self, album_id):
return self.api_call('album/' + str(album_id))
def get_album_by_UPC(self, upc):
return self.api_call('album/upc:' + str(upc))
def get_album_tracks(self, album_id):
return self.api_call('album/' + str(album_id) + '/tracks', {'limit': -1})
def get_artist(self, artist_id):
return self.api_call('artist/' + str(artist_id))
def get_artist_albums(self, artist_id):
return self.api_call('artist/' + str(artist_id) + '/albums', {'limit': -1})
def get_artist_related(self, artist_id):
return self.api_call('artist/' + str(artist_id) + '/related', {'limit': -1})
def search(self, term, search_type, limit=30, index=0):
return self.api_call('search/' + search_type, {'q': clean_search_query(term), 'limit': limit, 'index': index})
def decrypt_track(self, track_id, input, output):
response = open(input, 'rb')
outfile = open(output, 'wb')
blowfish_key = str.encode(self._get_blowfish_key(str(track_id)))
i = 0
while True:
chunk = response.read(2048)
if not chunk:
break
if (i % 3) == 0 and len(chunk) == 2048:
chunk = Blowfish.new(blowfish_key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07").decrypt(
chunk)
outfile.write(chunk)
i += 1
def stream_track(self, track_id, url, stream):
try:
request = requests.get(url, headers=self.http_headers, stream=True, timeout=30)
except:
eventlet.sleep(2)
return self.stream_track(track_id, url, stream)
request.raise_for_status()
blowfish_key = str.encode(self._get_blowfish_key(str(track_id)))
i = 0
for chunk in request.iter_content(2048):
if (i % 3) == 0 and len(chunk) == 2048:
chunk = Blowfish.new(blowfish_key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07").decrypt(
chunk)
stream.write(chunk)
i += 1
def _md5(self, data):
h = MD5.new()
h.update(str.encode(data) if isinstance(data, str) else data)
return h.hexdigest()
def _get_blowfish_key(self, trackId):
SECRET = 'g4el58wc' + '0zvf9na1'
idMd5 = self._md5(trackId)
bfKey = ""
for i in range(16):
bfKey += chr(ord(idMd5[i]) ^ ord(idMd5[i + 16]) ^ ord(SECRET[i]))
return bfKey
def get_track_stream_url(self, sng_id, md5, media_version, format):
urlPart = b'\xa4'.join(
[str.encode(md5), str.encode(str(format)), str.encode(str(sng_id)), str.encode(str(media_version))])
md5val = self._md5(urlPart)
step2 = str.encode(md5val) + b'\xa4' + urlPart + b'\xa4'
step2 = pad(step2, 16)
urlPart = binascii.hexlify(AES.new(b'jo6aey6haid2Teih', AES.MODE_ECB).encrypt(step2))
return "https://e-cdns-proxy-" + md5[0] + ".dzcdn.net/mobile/1/" + urlPart.decode("utf-8")
def get_track_from_metadata(self, artist, track, album):
artist = artist.replace("", "-").replace("", "'")
track = track.replace("", "-").replace("", "'")
album = album.replace("", "-").replace("", "'")
resp = self.search(f'artist:"{artist}" track:"{track}" album:"{album}"', "track", 1)
if len(resp['data']) > 0:
return resp['data'][0]['id']
resp = self.search(f'artist:"{artist}" track:"{track}"', "track", 1)
if len(resp['data']) > 0:
return resp['data'][0]['id']
if "(" in track and ")" in track and track.find("(") < track.find(")"):
resp = self.search(f'artist:"{artist}" track:"{track[:track.find("(")]}"', "track", 1)
if len(resp['data']) > 0:
return resp['data'][0]['id']
elif " - " in track:
resp = self.search(f'artist:"{artist}" track:"{track[:track.find(" - ")]}"', "track", 1)
if len(resp['data']) > 0:
return resp['data'][0]['id']
else:
return "0"
return "0"
def clean_search_query(term):
term = str(term)
term = re.sub(r' feat[\.]? ', " ", term)
term = re.sub(r' ft[\.]? ', " ", term)
term = re.sub(r'\(feat[\.]? ', " ", term)
term = re.sub(r'\(ft[\.]? ', " ", term)
term = term.replace('&', " ").replace('', "-").replace('', "-")
return term
class APIError(Exception):
pass

View File

@ -1,4 +1,4 @@
from deemix.api.deezer import Deezer
from deezer import Deezer
from deemix.app.settings import Settings
from deemix.app.queuemanager import QueueManager
from deemix.app.spotifyhelper import SpotifyHelper

View File

@ -18,8 +18,10 @@ from deemix.app.queueitem import QISingle, QICollection
from deemix.app.track import Track, AlbumDoesntExists
from deemix.utils import changeCase
from deemix.utils.pathtemplates import generateFilename, generateFilepath, settingsRegexAlbum, settingsRegexArtist, settingsRegexPlaylistFile
from deemix.api.deezer import USER_AGENT_HEADER, TrackFormats
from deezer import TrackFormats
from deemix import USER_AGENT_HEADER
from deemix.utils.taggers import tagID3, tagFLAC
from deemix.utils.decryption import generateStreamURL, generateBlowfishKey
from deemix.app.settings import OverwriteOption, FeaturesOption
from Cryptodome.Cipher import Blowfish
@ -240,14 +242,14 @@ class DownloadJob:
if track.MD5 == '':
if track.fallbackId != "0":
logger.warn(f"[{track.mainArtist['name']} - {track.title}] Track not yet encoded, using fallback id")
newTrack = self.dz.get_track_gw(track.fallbackId)
newTrack = self.dz.gw.get_track_with_fallback(track.fallbackId)
track.parseEssentialData(self.dz, newTrack)
return self.download(trackAPI_gw, track)
elif not track.searched and self.settings['fallbackSearch']:
logger.warn(f"[{track.mainArtist['name']} - {track.title}] Track not yet encoded, searching for alternative")
searchedId = self.dz.get_track_from_metadata(track.mainArtist['name'], track.title, track.album['title'])
searchedId = self.dz.api.get_track_id_from_metadata(track.mainArtist['name'], track.title, track.album['title'])
if searchedId != "0":
newTrack = self.dz.get_track_gw(searchedId)
newTrack = self.dz.gw.get_track_with_fallback(searchedId)
track.parseEssentialData(self.dz, newTrack)
track.searched = True
if self.interface:
@ -271,14 +273,14 @@ class DownloadJob:
except PreferredBitrateNotFound:
if track.fallbackId != "0":
logger.warn(f"[{track.mainArtist['name']} - {track.title}] Track not found at desired bitrate, using fallback id")
newTrack = self.dz.get_track_gw(track.fallbackId)
newTrack = self.dz.gw.get_track_with_fallback(track.fallbackId)
track.parseEssentialData(self.dz, newTrack)
return self.download(trackAPI_gw, track)
elif not track.searched and self.settings['fallbackSearch']:
logger.warn(f"[{track.mainArtist['name']} - {track.title}] Track not found at desired bitrate, searching for alternative")
searchedId = self.dz.get_track_from_metadata(track.mainArtist['name'], track.title, track.album['title'])
searchedId = self.dz.api.get_track_id_from_metadata(track.mainArtist['name'], track.title, track.album['title'])
if searchedId != "0":
newTrack = self.dz.get_track_gw(searchedId)
newTrack = self.dz.gw.get_track_with_fallback(searchedId)
track.parseEssentialData(self.dz, newTrack)
track.searched = True
if self.interface:
@ -514,7 +516,7 @@ class DownloadJob:
if not trackAlreadyDownloaded or self.settings['overwriteFile'] == OverwriteOption.OVERWRITE:
logger.info(f"[{track.mainArtist['name']} - {track.title}] Downloading the track")
track.downloadUrl = self.dz.get_track_stream_url(track.id, track.MD5, track.mediaVersion, track.selectedFormat)
track.downloadUrl = generateStreamURL(track.id, track.MD5, track.mediaVersion, track.selectedFormat)
def downloadMusic(track, trackAPI_gw):
try:
@ -527,14 +529,14 @@ class DownloadJob:
if writepath.is_file(): writepath.unlink()
if track.fallbackId != "0":
logger.warn(f"[{track.mainArtist['name']} - {track.title}] Track not available, using fallback id")
newTrack = self.dz.get_track_gw(track.fallbackId)
newTrack = self.dz.gw.get_track_with_fallback(track.fallbackId)
track.parseEssentialData(self.dz, newTrack)
return False
elif not track.searched and self.settings['fallbackSearch']:
logger.warn(f"[{track.mainArtist['name']} - {track.title}] Track not available, searching for alternative")
searchedId = self.dz.get_track_from_metadata(track.mainArtist['name'], track.title, track.album['title'])
searchedId = self.dz.api.get_track_id_from_metadata(track.mainArtist['name'], track.title, track.album['title'])
if searchedId != "0":
newTrack = self.dz.get_track_gw(searchedId)
newTrack = self.dz.gw.get_track_with_fallback(searchedId)
track.parseEssentialData(self.dz, newTrack)
track.searched = True
if self.interface:
@ -638,7 +640,7 @@ class DownloadJob:
if int(track.filesizes[f"FILESIZE_{formatName}"]) != 0: return formatNumber
if not track.filesizes[f"FILESIZE_{formatName}_TESTED"]:
request = requests.head(
self.dz.get_track_stream_url(track.id, track.MD5, track.mediaVersion, formatNumber),
generateStreamURL(track.id, track.MD5, track.mediaVersion, formatNumber),
headers={'User-Agent': USER_AGENT_HEADER},
timeout=30
)
@ -679,8 +681,7 @@ class DownloadJob:
try:
with self.dz.session.get(track.downloadUrl, headers=headers, stream=True, timeout=10) as request:
request.raise_for_status()
blowfish_key = str.encode(self.dz._get_blowfish_key(str(track.id)))
blowfish_key = str.encode(generateBlowfishKey(str(track.id)))
complete = int(request.headers["Content-Length"])
if complete == 0: raise DownloadEmpty

View File

@ -1,6 +1,7 @@
from deemix.app.downloadjob import DownloadJob
from deemix.utils import getIDFromLink, getTypeFromLink, getBitrateInt
from deemix.api.deezer import APIError, LyricsStatus
from deezer.gw import APIError as gwAPIError, LyricsStatus
from deezer.api import APIError
from spotipy.exceptions import SpotifyException
from deemix.app.queueitem import QueueItem, QISingle, QICollection, QIConvertable
import logging
@ -26,7 +27,7 @@ class QueueManager:
# Check if is an isrc: url
if str(id).startswith("isrc"):
try:
trackAPI = dz.get_track(id)
trackAPI = dz.api.get_track(id)
except APIError as e:
e = json.loads(str(e))
return QueueError("https://deezer.com/track/"+str(id), f"Wrong URL: {e['type']+': ' if 'type' in e else ''}{e['message'] if 'message' in e else ''}")
@ -37,8 +38,8 @@ class QueueManager:
# Get essential track info
try:
trackAPI_gw = dz.get_track_gw(id)
except APIError as e:
trackAPI_gw = dz.gw.get_track_with_fallback(id)
except gwAPIError as e:
e = json.loads(str(e))
message = "Wrong URL"
if "DATA_ERROR" in e: message += f": {e['DATA_ERROR']}"
@ -74,7 +75,7 @@ class QueueManager:
def generateAlbumQueueItem(self, dz, id, settings, bitrate):
# Get essential album info
try:
albumAPI = dz.get_album(id)
albumAPI = dz.api.get_album(id)
except APIError as e:
e = json.loads(str(e))
return QueueError("https://deezer.com/album/"+str(id), f"Wrong URL: {e['type']+': ' if 'type' in e else ''}{e['message'] if 'message' in e else ''}")
@ -83,7 +84,7 @@ class QueueManager:
# Get extra info about album
# This saves extra api calls when downloading
albumAPI_gw = dz.get_album_gw(id)
albumAPI_gw = dz.gw.get_album(id)
albumAPI['nb_disk'] = albumAPI_gw['NUMBER_DISK']
albumAPI['copyright'] = albumAPI_gw['COPYRIGHT']
@ -91,7 +92,7 @@ class QueueManager:
if albumAPI['nb_tracks'] == 1:
return self.generateTrackQueueItem(dz, albumAPI['tracks']['data'][0]['id'], settings, bitrate, albumAPI=albumAPI)
tracksArray = dz.get_album_tracks_gw(id)
tracksArray = dz.gw.get_album_tracks(id)
if albumAPI['cover_small'] != None:
cover = albumAPI['cover_small'][:-24] + '/75x75-000000-80-0-0.jpg'
@ -126,14 +127,14 @@ class QueueManager:
def generatePlaylistQueueItem(self, dz, id, settings, bitrate):
# Get essential playlist info
try:
playlistAPI = dz.get_playlist(id)
playlistAPI = dz.api.get_playlist(id)
except:
playlistAPI = None
# Fallback to gw api if the playlist is private
if not playlistAPI:
try:
playlistAPI = dz.get_playlist_gw(id)
except APIError as e:
playlistAPI = dz.gw.get_playlist_page(id)
except gwAPIError as e:
e = json.loads(str(e))
message = "Wrong URL"
if "DATA_ERROR" in e:
@ -141,12 +142,12 @@ class QueueManager:
return QueueError("https://deezer.com/playlist/"+str(id), message)
# Check if private playlist and owner
if not playlistAPI['public'] and playlistAPI['creator']['id'] != str(dz.user['id']):
if not playlistAPI['public'] and playlistAPI['creator']['id'] != str(dz.current_user['id']):
logger.warn("You can't download others private playlists.")
return QueueError("https://deezer.com/playlist/"+str(id), "You can't download others private playlists.", "notYourPrivatePlaylist")
playlistTracksAPI = dz.get_playlist_tracks_gw(id)
playlistAPI['various_artist'] = dz.get_artist(5080) # Useful for save as compilation
playlistTracksAPI = dz.gw.get_playlist_tracks(id)
playlistAPI['various_artist'] = dz.api.get_artist(5080) # Useful for save as compilation
totalSize = len(playlistTracksAPI)
playlistAPI['nb_tracks'] = totalSize
@ -178,14 +179,14 @@ class QueueManager:
def generateArtistQueueItem(self, dz, id, settings, bitrate, interface=None):
# Get essential artist info
try:
artistAPI = dz.get_artist(id)
artistAPI = dz.api.get_artist(id)
except APIError as e:
e = json.loads(str(e))
return QueueError("https://deezer.com/artist/"+str(id), f"Wrong URL: {e['type']+': ' if 'type' in e else ''}{e['message'] if 'message' in e else ''}")
if interface: interface.send("startAddingArtist", {'name': artistAPI['name'], 'id': artistAPI['id']})
artistDiscographyAPI = dz.get_artist_discography_gw(id, 100)
artistDiscographyAPI = dz.gw.get_artist_discography_tabs(id, 100)
allReleases = artistDiscographyAPI.pop('all', [])
albumList = []
for album in allReleases:
@ -197,14 +198,14 @@ class QueueManager:
def generateArtistDiscographyQueueItem(self, dz, id, settings, bitrate, interface=None):
# Get essential artist info
try:
artistAPI = dz.get_artist(id)
artistAPI = dz.api.get_artist(id)
except APIError as e:
e = json.loads(str(e))
return QueueError("https://deezer.com/artist/"+str(id)+"/discography", f"Wrong URL: {e['type']+': ' if 'type' in e else ''}{e['message'] if 'message' in e else ''}")
if interface: interface.send("startAddingArtist", {'name': artistAPI['name'], 'id': artistAPI['id']})
artistDiscographyAPI = dz.get_artist_discography_gw(id, 100)
artistDiscographyAPI = dz.gw.get_artist_discography_tabs(id, 100)
artistDiscographyAPI.pop('all', None) # all contains albums and singles, so its all duplicates. This removes them
albumList = []
for type in artistDiscographyAPI:
@ -217,7 +218,7 @@ class QueueManager:
def generateArtistTopQueueItem(self, dz, id, settings, bitrate, interface=None):
# Get essential artist info
try:
artistAPI = dz.get_artist(id)
artistAPI = dz.api.get_artist(id)
except APIError as e:
e = json.loads(str(e))
return QueueError("https://deezer.com/artist/"+str(id)+"/top_track", f"Wrong URL: {e['type']+': ' if 'type' in e else ''}{e['message'] if 'message' in e else ''}")
@ -252,8 +253,8 @@ class QueueManager:
'type': "playlist"
}
artistTopTracksAPI_gw = dz.get_artist_toptracks_gw(id)
playlistAPI['various_artist'] = dz.get_artist(5080) # Useful for save as compilation
artistTopTracksAPI_gw = dz.gw.get_artist_toptracks(id)
playlistAPI['various_artist'] = dz.api.get_artist(5080) # Useful for save as compilation
totalSize = len(artistTopTracksAPI_gw)
playlistAPI['nb_tracks'] = totalSize

View File

@ -2,7 +2,7 @@ import json
from pathlib import Path
from os import makedirs, listdir
from deemix import __version__ as deemixVersion
from deemix.api.deezer import TrackFormats
from deezer import TrackFormats
from deemix.utils import checkFolder
import logging
import datetime

View File

@ -151,7 +151,7 @@ class SpotifyHelper:
if str(track_id) in cache['tracks']:
dz_track = None
if cache['tracks'][str(track_id)]['isrc']:
dz_track = dz.get_track_by_ISRC(cache['tracks'][str(track_id)]['isrc'])
dz_track = dz.api.get_track_by_ISRC(cache['tracks'][str(track_id)]['isrc'])
dz_id = dz_track['id'] if 'id' in dz_track and 'title' in dz_track else "0"
cache['tracks'][str(track_id)]['id'] = dz_id
return (cache['tracks'][str(track_id)]['id'], dz_track, cache['tracks'][str(track_id)]['isrc'])
@ -164,15 +164,21 @@ class SpotifyHelper:
isrc = None
if 'external_ids' in spotify_track and 'isrc' in spotify_track['external_ids']:
try:
dz_track = dz.get_track_by_ISRC(spotify_track['external_ids']['isrc'])
dz_track = dz.api.get_track_by_ISRC(spotify_track['external_ids']['isrc'])
dz_id = dz_track['id'] if 'id' in dz_track and 'title' in dz_track else "0"
isrc = spotify_track['external_ids']['isrc']
except:
dz_id = dz.get_track_from_metadata(spotify_track['artists'][0]['name'], spotify_track['name'],
spotify_track['album']['name']) if fallbackSearch else "0"
dz_id = dz.api.get_track_id_from_metadata(
artist=spotify_track['artists'][0]['name'],
track=spotify_track['name'],
album=spotify_track['album']['name']
) if fallbackSearch else "0"
elif fallbackSearch:
dz_id = dz.get_track_from_metadata(spotify_track['artists'][0]['name'], spotify_track['name'],
spotify_track['album']['name'])
dz_id = dz.api.get_track_id_from_metadata(
artist=spotify_track['artists'][0]['name'],
track=spotify_track['name'],
album=spotify_track['album']['name']
)
if singleTrack:
cache['tracks'][str(track_id)] = {'id': dz_id, 'isrc': isrc}
with open(self.configFolder / 'spotifyCache.json', 'w') as spotifyCache:
@ -195,12 +201,12 @@ class SpotifyHelper:
upc = None
if 'external_ids' in spotify_album and 'upc' in spotify_album['external_ids']:
try:
dz_album = dz.get_album_by_UPC(spotify_album['external_ids']['upc'])
dz_album = dz.api.get_album_by_UPC(spotify_album['external_ids']['upc'])
dz_album = dz_album['id'] if 'id' in dz_album else "0"
upc = spotify_album['external_ids']['upc']
except:
try:
dz_album = dz.get_album_by_UPC(int(spotify_album['external_ids']['upc']))
dz_album = dz.api.get_album_by_UPC(int(spotify_album['external_ids']['upc']))
dz_album = dz_album['id'] if 'id' in dz_album else "0"
except:
dz_album = "0"
@ -221,7 +227,7 @@ class SpotifyHelper:
cover = "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/75x75-000000-80-0-0.jpg"
playlistAPI = self._convert_playlist_structure(spotify_playlist)
playlistAPI['various_artist'] = dz.get_artist(5080)
playlistAPI['various_artist'] = dz.api.get_artist(5080)
extra = {}
extra['unconverted'] = []
@ -271,7 +277,7 @@ class SpotifyHelper:
trackID = cache['tracks'][str(track['id'])]['id']
trackAPI = None
if cache['tracks'][str(track['id'])]['isrc']:
trackAPI = dz.get_track_by_ISRC(cache['tracks'][str(track['id'])]['isrc'])
trackAPI = dz.api.get_track_by_ISRC(cache['tracks'][str(track['id'])]['isrc'])
else:
(trackID, trackAPI, isrc) = self.get_trackid_spotify(dz, "0", queueItem.settings['fallbackSearch'], track)
cache['tracks'][str(track['id'])] = {
@ -292,7 +298,7 @@ class SpotifyHelper:
'ART_NAME': track['artists'][0]['name']
}
else:
deezerTrack = dz.get_track_gw(trackID)
deezerTrack = dz.gw.get_track_with_fallback(trackID)
deezerTrack['_EXTRA_PLAYLIST'] = queueItem.extra['playlistAPI']
if trackAPI:
deezerTrack['_EXTRA_TRACK'] = trackAPI

View File

@ -1,6 +1,10 @@
import eventlet
requests = eventlet.import_patched('requests')
import logging
from deemix.api.deezer import APIError, LyricsStatus
from deezer.gw import APIError as gwAPIError, LyricsStatus
from deezer.api import APIError
from deemix.utils import removeFeatures, andCommaConcat, uniqueArray, generateReplayGainString
logging.basicConfig(level=logging.INFO)
@ -66,7 +70,7 @@ class Track:
if 'FALLBACK' in trackAPI_gw:
self.fallbackId = trackAPI_gw['FALLBACK']['SNG_ID']
if int(self.id) > 0:
self.filesizes = dz.get_track_filesizes(self.id)
self.filesizes = self.getFilesizes(dz)
def parseLocalTrackData(self, trackAPI_gw):
# Local tracks has only the trackAPI_gw page and
@ -132,8 +136,8 @@ class Track:
if not "LYRICS" in trackAPI_gw and self.lyrics['id'] != 0:
logger.info(f"[{trackAPI_gw['ART_NAME']} - {self.title}] Getting lyrics")
try:
trackAPI_gw["LYRICS"] = dz.get_lyrics_gw(self.id)
except APIError:
trackAPI_gw["LYRICS"] = dz.gw.get_track_lyrics(self.id)
except gwAPIError:
self.lyrics['id'] = 0
if self.lyrics['id'] != 0:
self.lyrics['unsync'] = trackAPI_gw["LYRICS"].get("LYRICS_TEXT")
@ -184,7 +188,7 @@ class Track:
if not albumAPI:
logger.info(f"[{self.mainArtist['name']} - {self.title}] Getting album infos")
try:
albumAPI = dz.get_album(self.album['id'])
albumAPI = dz.api.get_album(self.album['id'])
except APIError:
albumAPI = None
@ -248,8 +252,8 @@ class Track:
if not albumAPI_gw:
logger.info(f"[{self.mainArtist['name']} - {self.title}] Getting more album infos")
try:
albumAPI_gw = dz.get_album_gw(self.album['id'])
except APIError:
albumAPI_gw = dz.gw.get_album(self.album['id'])
except gwAPIError:
albumAPI_gw = None
raise AlbumDoesntExists
@ -264,7 +268,7 @@ class Track:
# Getting artist image ID
# ex: https://e-cdns-images.dzcdn.net/images/artist/f2bc007e9133c946ac3c3907ddc5d2ea/56x56-000000-80-0-0.jpg
logger.info(f"[{self.mainArtist['name']} - {self.title}] Getting artist picture fallback")
artistAPI = dz.get_artist(self.album['mainArtist']['id'])
artistAPI = dz.api.get_artist(self.album['mainArtist']['id'])
self.album['mainArtist']['pic'] = artistAPI['picture_small'][artistAPI['picture_small'].find('artist/') + 7:-24]
self.album['artists'] = [albumAPI_gw['ART_NAME']]
@ -294,7 +298,7 @@ class Track:
if not trackAPI:
logger.info(f"[{self.mainArtist['name']} - {self.title}] Getting extra track infos")
trackAPI = dz.get_track(self.id)
trackAPI = dz.api.get_track(self.id)
self.bpm = trackAPI['bpm']
if not self.replayGain and 'gain' in trackAPI:
@ -327,13 +331,13 @@ class Track:
if not self.album['discTotal']:
if not albumAPI_gw:
logger.info(f"[{self.mainArtist['name']} - {self.title}] Getting more album infos")
albumAPI_gw = dz.get_album_gw(self.album['id'])
albumAPI_gw = dz.gw.get_album(self.album['id'])
self.album['discTotal'] = albumAPI_gw['NUMBER_DISK']
if not self.copyright:
if not albumAPI_gw:
logger.info(f"[{self.mainArtist['name']} - {self.title}] Getting more album infos")
albumAPI_gw = dz.get_album_gw(self.album['id'])
albumAPI_gw = dz.gw.get_album(self.album['id'])
self.copyright = albumAPI_gw['COPYRIGHT']
def parsePlaylistData(self, playlist, settings):
@ -393,6 +397,36 @@ class Track:
if 'Featured' in self.artist:
self.featArtistsString = "feat. "+andCommaConcat(self.artist['Featured'])
def getFilesizes(self, dz):
try:
guest_sid = dz.session.cookies.get('sid')
site = requests.post(
"https://api.deezer.com/1.0/gateway.php",
params={
'api_key': "4VCYIJUCDLOUELGD1V8WBVYBNVDYOXEWSLLZDONGBBDFVXTZJRXPR29JRLQFO6ZE",
'sid': guest_sid,
'input': '3',
'output': '3',
'method': 'song_getData'
},
timeout=30,
json={'sng_id': self.id},
headers=dz.http_headers
)
result_json = site.json()
except:
eventlet.sleep(2)
return self.getFilesizes(dz)
if len(result_json['error']):
raise APIError(json.dumps(result_json['error']))
response = result_json.get("results")
filesizes = {}
for key, value in response.items():
if key.startswith("FILESIZE_"):
filesizes[key] = value
filesizes[key+"_TESTED"] = False
return filesizes
class TrackError(Exception):
"""Base class for exceptions in this module."""
pass

View File

@ -1,6 +1,6 @@
import re
import string
from deemix.api.deezer import TrackFormats
from deezer import TrackFormats
import os
def generateReplayGainString(trackGain):

View File

@ -0,0 +1,26 @@
import binascii
from Cryptodome.Cipher import Blowfish, AES
from Cryptodome.Hash import MD5
from Cryptodome.Util.Padding import pad
def _md5(data):
h = MD5.new()
h.update(str.encode(data) if isinstance(data, str) else data)
return h.hexdigest()
def generateBlowfishKey(trackId):
SECRET = 'g4el58wc' + '0zvf9na1'
idMd5 = _md5(trackId)
bfKey = ""
for i in range(16):
bfKey += chr(ord(idMd5[i]) ^ ord(idMd5[i + 16]) ^ ord(SECRET[i]))
return bfKey
def generateStreamURL(sng_id, md5, media_version, format):
urlPart = b'\xa4'.join(
[str.encode(md5), str.encode(str(format)), str.encode(str(sng_id)), str.encode(str(media_version))])
md5val = _md5(urlPart)
step2 = str.encode(md5val) + b'\xa4' + urlPart + b'\xa4'
step2 = pad(step2, 16)
urlPart = binascii.hexlify(AES.new(b'jo6aey6haid2Teih', AES.MODE_ECB).encrypt(step2))
return "https://e-cdns-proxy-" + md5[0] + ".dzcdn.net/mobile/1/" + urlPart.decode("utf-8")

View File

@ -2,7 +2,7 @@ import re
from os.path import sep as pathSep
from pathlib import Path
from unicodedata import normalize
from deemix.api.deezer import TrackFormats
from deezer import TrackFormats
bitrateLabels = {
TrackFormats.MP4_RA3: "360 HQ",

View File

@ -4,3 +4,4 @@ mutagen
requests
spotipy>=2.11.0
eventlet
deezer-py

View File

@ -7,7 +7,7 @@ README = (HERE / "README.md").read_text()
setup(
name="deemix",
version="1.5.21",
version="2.0.1",
description="A barebone deezer downloader library",
long_description=README,
long_description_content_type="text/markdown",
@ -24,7 +24,7 @@ setup(
python_requires='>=3.6',
packages=find_packages(exclude=("tests",)),
include_package_data=True,
install_requires=["click", "pycryptodomex", "mutagen", "requests", "spotipy>=2.11.0", "eventlet"],
install_requires=["click", "pycryptodomex", "mutagen", "requests", "spotipy>=2.11.0", "eventlet", "deezer-py"],
entry_points={
"console_scripts": [
"deemix=deemix.__main__:download",