Merge branch 'pythonish' of nomorecoffee/deemix1 into master

This commit is contained in:
RemixDev 2020-02-17 19:46:07 +00:00 committed by Gogs
commit 9e831328c4
8 changed files with 192 additions and 171 deletions

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
import wx
from deemix.ui.MainFrame import MainFrame
if __name__ == '__main__':

View File

@ -1,20 +1,21 @@
#!/usr/bin/env python3
from urllib.request import urlopen
import requests
import json
import re
import hashlib
import pyaes
import binascii
import hashlib
from urllib.request import urlopen
import blowfish
import pyaes
import requests
USER_AGENT_HEADER = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36"
class Deezer:
def __init__(self):
self.api_url = "http://www.deezer.com/ajax/gw-light.php"
self.legacy_api_url = "https://api.deezer.com/"
self.http_headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36"
"User-Agent": USER_AGENT_HEADER
}
self.album_pictures_host = "https://e-cdns-images.dzcdn.net/images/cover/"
self.artist_pictures_host = "https://e-cdns-images.dzcdn.net/images/artist/"
@ -22,14 +23,15 @@ class Deezer:
self.session = requests.Session()
self.logged_in = False
self.session.post("http://www.deezer.com/", headers=self.http_headers)
self.sid = self.session.cookies.get_dict()['sid']
self.sid = self.session.cookies.get('sid')
def get_token(self):
tokenData = self.gw_api_call('deezer.getUserData')
return tokenData["results"]["checkForm"]
token_data = self.gw_api_call('deezer.getUserData')
return token_data["results"]["checkForm"]
def get_track_MD5(self, id):
site = self.session.post("https://api.deezer.com/1.0/gateway.php",
def get_track_md5(self, sng_id):
site = self.session.post(
"https://api.deezer.com/1.0/gateway.php",
params={
'api_key': "4VCYIJUCDLOUELGD1V8WBVYBNVDYOXEWSLLZDONGBBDFVXTZJRXPR29JRLQFO6ZE",
'sid': self.sid,
@ -37,10 +39,10 @@ class Deezer:
'output': '3',
'method': 'song_getData'
},
data = json.dumps({'sng_id': id}),
json={'sng_id': sng_id},
headers=self.http_headers
)
response = json.loads(site.text)
response = site.json()
return response['results']['PUID']
def gw_api_call(self, method, args={}):
@ -52,11 +54,10 @@ class Deezer:
'input': '3',
'method': method
},
data = json.dumps(args),
json=args,
headers=self.http_headers
)
result = json.loads(result.text)
return result
return result.json()
def api_call(self, method, args={}):
result = self.session.get(
@ -64,33 +65,34 @@ class Deezer:
params=args,
headers=self.http_headers
)
result_json = json.loads(result.text)
result_json = result.json()
if 'error' in result_json.keys():
raise APIError()
return result_json
def login(self, email, password, reCaptchaToken):
checkFormLogin = self.gw_api_call("deezer.getUserData")
def login(self, email, password, re_captcha_token):
check_form_login = self.gw_api_call("deezer.getUserData")
login = self.session.post(
"https://www.deezer.com/ajax/action.php",
data={
'type': 'login',
'mail': email,
'password': password,
'checkFormLogin':checkFormLogin['results']['checkFormLogin'],
'reCaptchaToken': reCaptchaToken
'checkFormLogin': check_form_login['results']['checkFormLogin'],
'reCaptchaToken': re_captcha_token
},
headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'}.update(self.http_headers)
headers={'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', **self.http_headers}
)
if not 'success' in login.text:
if 'success' not in login.text:
self.logged_in = False
return False
userData = self.gw_api_call("deezer.getUserData")
user_data = self.gw_api_call("deezer.getUserData")
self.user = {
'email': email,
'id': userData["results"]["USER"]["USER_ID"],
'name': userData["results"]["USER"]["BLOG_NAME"],
'picture': userData["results"]["USER"]["USER_PICTURE"] if "USER_PICTURE" in userData["results"]["USER"] else ""
'id': user_data["results"]["USER"]["USER_ID"],
'name': user_data["results"]["USER"]["BLOG_NAME"],
'picture': user_data["results"]["USER"]["USER_PICTURE"] if "USER_PICTURE" in user_data["results"][
"USER"] else ""
}
self.logged_in = True
return True
@ -104,38 +106,39 @@ class Deezer:
rest={'HttpOnly': True}
)
self.session.cookies.set_cookie(cookie_obj)
userData = self.gw_api_call("deezer.getUserData")
if (userData["results"]["USER"]["USER_ID"] == 0):
user_data = self.gw_api_call("deezer.getUserData")
if user_data["results"]["USER"]["USER_ID"] == 0:
self.logged_in = False
return False
self.user = {
'id': userData["results"]["USER"]["USER_ID"],
'name': userData["results"]["USER"]["BLOG_NAME"],
'picture': userData["results"]["USER"]["USER_PICTURE"] if "USER_PICTURE" in userData["results"]["USER"] else ""
'id': user_data["results"]["USER"]["USER_ID"],
'name': user_data["results"]["USER"]["BLOG_NAME"],
'picture': user_data["results"]["USER"]["USER_PICTURE"] if "USER_PICTURE" in user_data["results"][
"USER"] else ""
}
self.logged_in = True
return True
def get_track_gw(self, id):
if (int(id)<0):
body = self.gw_api_call('song.getData', {'sng_id': id})
def get_track_gw(self, sng_id):
if int(sng_id) < 0:
body = self.gw_api_call('song.getData', {'sng_id': sng_id})
else:
body = self.gw_api_call('deezer.pageTrack', {'sng_id': id})
body = self.gw_api_call('deezer.pageTrack', {'sng_id': sng_id})
if 'LYRICS' in body['results']:
body['results']['DATA']['LYRICS'] = body['results']['LYRICS']
body['results'] = body['results']['DATA']
return body['results']
def get_tracks_gw(self, ids):
tracksArray = []
tracks_array = []
body = self.gw_api_call('song.getListData', {'sng_ids': ids})
errors = 0
for i in range(len(ids)):
if ids[i] != 0:
tracksArray.append(body['results']['data'][i-errors])
tracks_array.append(body['results']['data'][i - errors])
else:
errors += 1
tracksArray.append({
tracks_array.append({
'SNG_ID': 0,
'SNG_TITLE': '',
'DURATION': 0,
@ -147,68 +150,69 @@ class Deezer:
'ART_ID': 0,
'ART_NAME': ""
})
return tracksArray
return tracks_array
def get_album_gw(self, id):
body = self.gw_api_call('album.getData', {'alb_id': id})
def get_album_gw(self, alb_id):
body = self.gw_api_call('album.getData', {'alb_id': alb_id})
return body['results']
def get_album_tracks_gw(self, id):
tracksArray = []
body = self.gw_api_call('song.getListByAlbum', {'alb_id': id, 'nb': -1})
def get_album_tracks_gw(self, alb_id):
tracks_array = []
body = self.gw_api_call('song.getListByAlbum', {'alb_id': alb_id, 'nb': -1})
for track in body['results']['data']:
_track = track
_track['position'] = body['results']['data'].index(track)
tracksArray.append(_track)
return tracksArray
tracks_array.append(_track)
return tracks_array
def get_artist_gw(self, id):
body = self.gw_api_call('deezer.pageArtist', {'art_id': id})
def get_artist_gw(self, art_id):
body = self.gw_api_call('deezer.pageArtist', {'art_id': art_id})
return body
def get_playlist_gw(self, id):
body = self.gw_api_call('deezer.pagePlaylist', {'playlist_id': id})
def get_playlist_gw(self, playlist_id):
body = self.gw_api_call('deezer.pagePlaylist', {'playlist_id': playlist_id})
return body
def get_playlist_tracks_gw(self, id):
tracksArray = []
body = self.gw_api_call('playlist.getSongs', {'playlist_id': id, 'nb': -1})
def get_playlist_tracks_gw(self, playlist_id):
tracks_array = []
body = self.gw_api_call('playlist.getSongs', {'playlist_id': playlist_id, 'nb': -1})
for track in body['results']['data']:
_track = track
_track['position'] = body['results']['data'].index(track)
tracksArray.append(_track)
return tracksArray
track['position'] = body['results']['data'].index(track)
tracks_array.append(track)
return tracks_array
def get_artist_toptracks_gw(self, id):
tracksArray = []
body = self.gw_api_call('artist.getTopTrack', {'art_id': id, 'nb': 100})
def get_artist_toptracks_gw(self, art_id):
tracks_array = []
body = self.gw_api_call('artist.getTopTrack', {'art_id': art_id, 'nb': 100})
for track in body['results']['data']:
_track = track
_track['position'] = body['results']['data'].index(track)
tracksArray.append(_track)
return tracksArray
track['position'] = body['results']['data'].index(track)
tracks_array.append(track)
return tracks_array
def get_lyrics_gw(self, id):
body = self.gw_api_call('song.getLyrics', {'sng_id': id})
lyr = {}
lyr['unsyncLyrics'] = {
def get_lyrics_gw(self, sng_id):
body = self.gw_api_call('song.getLyrics', {'sng_id': sng_id})
lyr = {
'unsyncLyrics': {
'description': "",
'lyrics': body["results"]["LYRICS_TEXT"]
},
'syncLyrics': "",
}
lyr['syncLyrics'] = ""
for i in range(len(body["results"]["LYRICS_SYNC_JSON"])):
if "lrc_timestamp" in body["results"]["LYRICS_SYNC_JSON"][i]:
lyr['syncLyrics'] += body["results"]["LYRICS_SYNC_JSON"][i]["lrc_timestamp"] + body["results"]["LYRICS_SYNC_JSON"][i]["line"]+"\r\n"
lyr['syncLyrics'] += body["results"]["LYRICS_SYNC_JSON"][i]["lrc_timestamp"] + \
body["results"]["LYRICS_SYNC_JSON"][i]["line"] + "\r\n"
elif i + 1 < len(body["results"]["LYRICS_SYNC_JSON"]):
lyr['syncLyrics'] += body["results"]["LYRICS_SYNC_JSON"][i+1]["lrc_timestamp"] + body["results"]["LYRICS_SYNC_JSON"][i]["line"]+"\r\n"
lyr['syncLyrics'] += body["results"]["LYRICS_SYNC_JSON"][i + 1]["lrc_timestamp"] + \
body["results"]["LYRICS_SYNC_JSON"][i]["line"] + "\r\n"
return lyr
def get_user_playlist(self, id):
body = self.api_call('user/'+str(id)+'/playlists', {'limit': -1})
def get_user_playlist(self, user_id):
body = self.api_call('user/' + str(user_id) + '/playlists', {'limit': -1})
return body
def get_track(self, id):
body = self.api_call('track/'+str(id))
def get_track(self, user_id):
body = self.api_call('track/' + str(user_id))
return body
def get_track_by_ISRC(self, isrc):
@ -218,41 +222,41 @@ class Deezer:
def get_charts_top_country(self):
return self.get_user_playlist('637006841')
def get_playlist(self, id):
body = self.api_call('playlist/'+str(id))
def get_playlist(self, playlist_id):
body = self.api_call('playlist/' + str(playlist_id))
return body
def get_playlist_tracks(self, id):
body = self.api_call('playlist/'+str(id)+'/tracks', {'limit': -1})
def get_playlist_tracks(self, playlist_id):
body = self.api_call('playlist/' + str(playlist_id) + '/tracks', {'limit': -1})
return body
def get_album(self, id):
body = self.api_call('album/'+str(id))
def get_album(self, album_id):
body = self.api_call('album/' + str(album_id))
return body
def get_album_by_UPC(self, upc):
body = self.api_call('album/upc:' + str(upc))
def get_album_tracks(self, id):
body = self.api_call('album/'+str(id)+'/tracks', {'limit': -1})
def get_album_tracks(self, album_id):
body = self.api_call('album/' + str(album_id) + '/tracks', {'limit': -1})
return body
def get_artist(self, id):
body = self.api_call('artist/'+str(id))
def get_artist(self, artist_id):
body = self.api_call('artist/' + str(artist_id))
return body
def get_artist_albums(self, id):
body = self.api_call('artist/'+str(id)+'/albums', {'limit': -1})
def get_artist_albums(self, artist_id):
body = self.api_call('artist/' + str(artist_id) + '/albums', {'limit': -1})
return body
def search(self, term, type, limit = 30):
body = self.api_call('search/'+type, {'q': term, 'limit': limit})
def search(self, term, search_type, limit=30):
body = self.api_call('search/' + search_type, {'q': term, 'limit': limit})
return body
def decrypt_track(self, trackId, input, output):
def decrypt_track(self, track_id, input, output):
response = open(input, 'rb')
outfile = open(output, 'wb')
cipher = blowfish.Cipher(str.encode(self._get_blowfish_key(str(trackId))))
cipher = blowfish.Cipher(str.encode(self._get_blowfish_key(str(track_id))))
i = 0
while True:
chunk = response.read(2048)
@ -263,9 +267,9 @@ class Deezer:
outfile.write(chunk)
i += 1
def stream_track(self, trackId, url, stream):
def stream_track(self, track_id, url, stream):
response = urlopen(url)
cipher = blowfish.Cipher(str.encode(self._get_blowfish_key(str(trackId))))
cipher = blowfish.Cipher(str.encode(self._get_blowfish_key(str(track_id))))
i = 0
while True:
chunk = response.read(2048)
@ -297,7 +301,8 @@ class Deezer:
return bfKey
def get_track_stream_url(self, sng_id, md5, media_version, format):
urlPart = b'\xa4'.join([str.encode(md5), str.encode(str(format)), str.encode(str(sng_id)), str.encode(str(media_version))])
urlPart = b'\xa4'.join(
[str.encode(md5), str.encode(str(format)), str.encode(str(sng_id)), str.encode(str(media_version))])
md5val = self._md5(urlPart)
step2 = str.encode(md5val) + b'\xa4' + urlPart + b'\xa4'
while len(step2) % 16 > 0:
@ -305,5 +310,6 @@ class Deezer:
urlPart = self._ecb_crypt(b'jo6aey6haid2Teih', step2)
return "https://e-cdns-proxy-" + md5[0] + ".dzcdn.net/mobile/1/" + urlPart.decode("utf-8")
class APIError(Exception):
pass

View File

@ -1,8 +1,8 @@
#!/usr/bin/env python3
import re
from deemix.api.deezer import Deezer, APIError
from deemix.utils.taggers import tagID3, tagFLAC
import json
import re
extensions = {
9: '.flac',
@ -16,6 +16,7 @@ extensions = {
dz = Deezer()
def getIDFromLink(link, type):
if '?' in link:
link = link[:link.find('?')]
@ -35,10 +36,11 @@ def getIDFromLink(link, type):
if type == "spotifyalbum":
return link[link.find("album:") + 6]
elif type == "artisttop":
return re.search("\/artist\/(\d+)\/top_track",link)[1]
return re.search(r"\/artist\/(\d+)\/top_track", link)[1]
else:
return link[link.rfind("/") + 1:]
def getTypeFromLink(link):
type = ''
if 'spotify' in link:
@ -62,12 +64,13 @@ def getTypeFromLink(link):
type = 'artist'
return type
def getTrackData(id):
if not id:
return None
trackAPI = dz.get_track_gw(id)
if not 'MD5_ORIGIN' in trackAPI:
trackAPI['MD5_ORIGIN'] = dz.get_track_MD5(id)
trackAPI['MD5_ORIGIN'] = dz.get_track_md5(id)
track = {}
track['id'] = trackAPI['SNG_ID']
@ -131,9 +134,11 @@ def getTrackData(id):
track['lyrics']['sync'] = ""
for i in range(len(trackAPI["LYRICS"]["LYRICS_SYNC_JSON"])):
if "lrc_timestamp" in trackAPI["LYRICS"]["LYRICS_SYNC_JSON"][i]:
track['lyrics']['sync'] += trackAPI["LYRICS"]["LYRICS_SYNC_JSON"][i]["lrc_timestamp"] + trackAPI["LYRICS"]["LYRICS_SYNC_JSON"][i]["line"]+"\r\n"
track['lyrics']['sync'] += trackAPI["LYRICS"]["LYRICS_SYNC_JSON"][i]["lrc_timestamp"] + \
trackAPI["LYRICS"]["LYRICS_SYNC_JSON"][i]["line"] + "\r\n"
elif i + 1 < len(trackAPI["LYRICS"]["LYRICS_SYNC_JSON"]):
track['lyrics']['sync'] += trackAPI["LYRICS"]["LYRICS_SYNC_JSON"][i+1]["lrc_timestamp"] + trackAPI["LYRICS"]["LYRICS_SYNC_JSON"][i]["line"]+"\r\n"
track['lyrics']['sync'] += trackAPI["LYRICS"]["LYRICS_SYNC_JSON"][i + 1]["lrc_timestamp"] + \
trackAPI["LYRICS"]["LYRICS_SYNC_JSON"][i]["line"] + "\r\n"
track['mainArtist'] = {}
track['mainArtist']['id'] = trackAPI['ART_ID']
@ -217,6 +222,7 @@ def getTrackData(id):
track['album']['discTotal'] = albumAPI2['NUMBER_DISK']
return track
def downloadTrack(id, bitrate):
# Get the metadata
track = getTrackData(id)
@ -252,10 +258,12 @@ def downloadTrack(id, bitrate):
track['album']['bitrate'] = track['selectedFormat']
# Create the filename
filename = "{artist} - {title}".format(title=track['title'], artist=track['mainArtist']['name'])+extensions[track['selectedFormat']]
filename = "{artist} - {title}".format(title=track['title'], artist=track['mainArtist']['name']) + extensions[
track['selectedFormat']]
print(filename)
track['downloadUrl'] = dz.get_track_stream_url(track['id'], track['MD5'], track['mediaVersion'], track['selectedFormat'])
track['downloadUrl'] = dz.get_track_stream_url(track['id'], track['MD5'], track['mediaVersion'],
track['selectedFormat'])
with open(filename, 'wb') as stream:
dz.stream_track(track['id'], track['downloadUrl'], stream)
if track['selectedFormat'] in [3, 1, 8]:

View File

@ -1,7 +1,9 @@
#!/usr/bin/env python3
import wx
from deemix.app.functions import downloadTrack, getIDFromLink, getTypeFromLink
class MainFrame(wx.Frame):
def __init__(self):
super().__init__(parent=None, title='deemix')
@ -12,12 +14,12 @@ class MainFrame(wx.Frame):
self.text_ctrl = wx.TextCtrl(panel)
search_sizer.Add(self.text_ctrl, 1, wx.ALL, 5)
my_btn = wx.Button(panel, label='Download')
my_btn.Bind(wx.EVT_BUTTON, self.downloadTrack)
my_btn.Bind(wx.EVT_BUTTON, self.download_track)
search_sizer.Add(my_btn, 0, wx.ALL, 5)
panel.SetSizer(main_sizer)
self.Show()
def downloadTrack(self, event):
def download_track(self, event):
value = self.text_ctrl.GetValue()
if not value:
print("You didn't enter anything!")

View File

@ -1,9 +1,11 @@
#!/usr/bin/env python3
from mutagen.id3 import ID3, ID3NoHeaderError
from mutagen.id3 import TXXX, TIT2, TPE1, TALB, TPE2, TRCK, TPOS, TCON, TYER, TDAT, TLEN, TBPM, TPUB, TSRC, USLT, APIC, IPLS, TCOM, TCOP, TCMP
from mutagen.flac import FLAC, Picture
from urllib.request import urlopen
from mutagen.flac import FLAC, Picture
from mutagen.id3 import ID3, ID3NoHeaderError, TXXX, TIT2, TPE1, TALB, TPE2, TRCK, TPOS, TCON, TYER, TDAT, TLEN, TBPM, \
TPUB, TSRC, USLT, APIC, IPLS, TCOM, TCOP
def tagID3(stream, track):
try:
tag = ID3(stream)
@ -28,21 +30,23 @@ def tagID3(stream, track):
tag.add(TXXX(desc="REPLAYGAIN_TRACK_GAIN", text=track['replayGain']))
if 'unsync' in track['lyrics']:
tag.add(USLT(text=track['lyrics']['unsync']))
involvedPeople = []
involved_people = []
for role in track['contributors']:
if role in ['author', 'engineer', 'mixer', 'producer', 'writer']:
for person in track['contributors'][role]:
involvedPeople.append([role,person])
involved_people.append([role, person])
elif role == 'composer':
tag.add(TCOM(text=track['contributors']['composer']))
if len(involvedPeople) > 0:
tag.add(IPLS(people=involvedPeople))
if len(involved_people) > 0:
tag.add(IPLS(people=involved_people))
tag.add(TCOP(text=track['copyright']))
tag.add(APIC(3, 'image/jpeg', 3, data=urlopen("http://e-cdn-images.deezer.com/images/cover/"+track["album"]['pic']+"/800x800.jpg").read()))
tag.add(APIC(3, 'image/jpeg', 3, data=urlopen(
"http://e-cdn-images.deezer.com/images/cover/" + track["album"]['pic'] + "/800x800.jpg").read()))
tag.save(stream, v1=2, v2_version=3, v23_sep=None)
def tagFLAC(stream, track):
tag = FLAC(stream)