Code parity with deemix-js

This commit is contained in:
RemixDev 2021-06-07 20:25:51 +02:00
parent 69c165e2bc
commit 224a62aad2
No known key found for this signature in database
GPG Key ID: B33962B465BDB51C
21 changed files with 715 additions and 555 deletions

View File

@ -2,11 +2,16 @@
import re
from urllib.request import urlopen
from deemix.itemgen import generateTrackItem, generateAlbumItem, generatePlaylistItem, generateArtistItem, generateArtistDiscographyItem, generateArtistTopItem
from deemix.itemgen import generateTrackItem, \
generateAlbumItem, \
generatePlaylistItem, \
generateArtistItem, \
generateArtistDiscographyItem, \
generateArtistTopItem, \
LinkNotRecognized, \
LinkNotSupported
__version__ = "2.0.16"
USER_AGENT_HEADER = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/79.0.3945.130 Safari/537.36"
__version__ = "3.0.0"
# Returns the Resolved URL, the Type and the ID
def parseLink(link):
@ -42,11 +47,20 @@ def parseLink(link):
return (link, link_type, link_id)
def generateDownloadObject(dz, link, bitrate):
def generateDownloadObject(dz, link, bitrate, plugins=None, listener=None):
(link, link_type, link_id) = parseLink(link)
if link_type is None or link_id is None:
return None
if plugins is None: plugins = {}
plugin_names = plugins.keys()
current_plugin = None
item = None
for plugin in plugin_names:
current_plugin = plugins[plugin]
item = current_plugin.generateDownloadObject(dz, link, bitrate, listener)
if item: return item
raise LinkNotRecognized(link)
if link_type == "track":
return generateTrackItem(dz, link_id, bitrate)
if link_type == "album":
@ -54,10 +68,10 @@ def generateDownloadObject(dz, link, bitrate):
if link_type == "playlist":
return generatePlaylistItem(dz, link_id, bitrate)
if link_type == "artist":
return generateArtistItem(dz, link_id, bitrate)
return generateArtistItem(dz, link_id, bitrate, listener)
if link_type == "artist_discography":
return generateArtistDiscographyItem(dz, link_id, bitrate)
return generateArtistDiscographyItem(dz, link_id, bitrate, listener)
if link_type == "artist_top":
return generateArtistTopItem(dz, link_id, bitrate)
return None
raise LinkNotSupported(link)

View File

@ -6,7 +6,7 @@ from deezer import Deezer
from deezer import TrackFormats
from deemix import generateDownloadObject
from deemix.settings import loadSettings
from deemix.settings import load as loadSettings
from deemix.utils import getBitrateNumberFromText
import deemix.utils.localpaths as localpaths
from deemix.downloader import Downloader
@ -62,7 +62,7 @@ def download(url, bitrate, portable, path):
# If first url is filepath readfile and use them as URLs
try:
isfile = Path(url[0]).is_file()
except:
except Exception:
isfile = False
if isfile:
filename = url[0]

View File

@ -1,53 +1,37 @@
import binascii
from ssl import SSLError
from time import sleep
import logging
from Cryptodome.Cipher import Blowfish, AES
from Cryptodome.Hash import MD5
from requests import get
from requests.exceptions import ConnectionError as RequestsConnectionError, ReadTimeout
from requests.exceptions import ConnectionError as RequestsConnectionError, ReadTimeout, ChunkedEncodingError
from urllib3.exceptions import SSLError as u3SSLError
from deemix import USER_AGENT_HEADER
from deemix.utils.crypto import _md5, _ecbCrypt, _ecbDecrypt, generateBlowfishKey, decryptChunk
from deemix.utils import USER_AGENT_HEADER
from deemix.types.DownloadObjects import Single
logger = logging.getLogger('deemix')
def _md5(data):
h = MD5.new()
h.update(data.encode() if isinstance(data, str) else data)
return h.hexdigest()
def generateBlowfishKey(trackId):
SECRET = 'g4el58wc0zvf9na1'
idMd5 = _md5(trackId)
bfKey = ""
for i in range(16):
bfKey += chr(ord(idMd5[i]) ^ ord(idMd5[i + 16]) ^ ord(SECRET[i]))
return bfKey
def generateStreamPath(sng_id, md5, media_version, media_format):
urlPart = b'\xa4'.join(
[md5.encode(), str(media_format).encode(), str(sng_id).encode(), str(media_version).encode()])
md5val = _md5(urlPart)
step2 = md5val.encode() + b'\xa4' + urlPart + b'\xa4'
step2 = step2 + (b'.' * (16 - (len(step2) % 16)))
urlPart = binascii.hexlify(AES.new(b'jo6aey6haid2Teih', AES.MODE_ECB).encrypt(step2))
urlPart = _ecbCrypt('jo6aey6haid2Teih', step2)
return urlPart.decode("utf-8")
def reverseStreamPath(urlPart):
step2 = AES.new(b'jo6aey6haid2Teih', AES.MODE_ECB).decrypt(binascii.unhexlify(urlPart.encode("utf-8")))
step2 = _ecbDecrypt('jo6aey6haid2Teih', urlPart)
(_, md5, media_format, sng_id, media_version, _) = step2.split(b'\xa4')
return (sng_id.decode('utf-8'), md5.decode('utf-8'), media_version.decode('utf-8'), media_format.decode('utf-8'))
def generateStreamURL(sng_id, md5, media_version, media_format):
def generateCryptedStreamURL(sng_id, md5, media_version, media_format):
urlPart = generateStreamPath(sng_id, md5, media_version, media_format)
return "https://e-cdns-proxy-" + md5[0] + ".dzcdn.net/mobile/1/" + urlPart
def generateUnencryptedStreamURL(sng_id, md5, media_version, media_format):
def generateStreamURL(sng_id, md5, media_version, media_format):
urlPart = generateStreamPath(sng_id, md5, media_version, media_format)
return "https://e-cdns-proxy-" + md5[0] + ".dzcdn.net/api/1/" + urlPart
@ -55,7 +39,8 @@ def reverseStreamURL(url):
urlPart = url[url.find("/1/")+3:]
return reverseStreamPath(urlPart)
def streamUnencryptedTrack(outputStream, track, start=0, downloadObject=None, interface=None):
def streamTrack(outputStream, track, start=0, downloadObject=None, listener=None):
if downloadObject.isCanceled: raise DownloadCanceled
headers= {'User-Agent': USER_AGENT_HEADER}
chunkLength = start
@ -69,9 +54,23 @@ def streamUnencryptedTrack(outputStream, track, start=0, downloadObject=None, in
if complete == 0: raise DownloadEmpty
if start != 0:
responseRange = request.headers["Content-Range"]
logger.info('%s downloading range %s', itemName, responseRange)
if listener:
listener.send('downloadInfo', {
'uuid': downloadObject.uuid,
'itemName': itemName,
'state': "downloading",
'alreadyStarted': True,
'value': responseRange
})
else:
logger.info('%s downloading %s bytes', itemName, complete)
if listener:
listener.send('downloadInfo', {
'uuid': downloadObject.uuid,
'itemName': itemName,
'state': "downloading",
'alreadyStarted': False,
'value': complete
})
for chunk in request.iter_content(2048 * 3):
outputStream.write(chunk)
@ -79,24 +78,24 @@ def streamUnencryptedTrack(outputStream, track, start=0, downloadObject=None, in
if downloadObject:
if isinstance(downloadObject, Single):
percentage = (chunkLength / (complete + start)) * 100
downloadObject.progressNext = percentage
chunkProgres = (chunkLength / (complete + start)) * 100
downloadObject.progressNext = chunkProgres
else:
chunkProgres = (len(chunk) / (complete + start)) / downloadObject.size * 100
downloadObject.progressNext += chunkProgres
downloadObject.updateProgress(interface)
downloadObject.updateProgress(listener)
except (SSLError, u3SSLError):
logger.info('%s retrying from byte %s', itemName, chunkLength)
streamUnencryptedTrack(outputStream, track, chunkLength, downloadObject, interface)
except (RequestsConnectionError, ReadTimeout):
streamTrack(outputStream, track, chunkLength, downloadObject, listener)
except (RequestsConnectionError, ReadTimeout, ChunkedEncodingError):
sleep(2)
streamUnencryptedTrack(outputStream, track, start, downloadObject, interface)
streamTrack(outputStream, track, start, downloadObject, listener)
def streamTrack(outputStream, track, start=0, downloadObject=None, interface=None):
def streamCryptedTrack(outputStream, track, start=0, downloadObject=None, listener=None):
if downloadObject.isCanceled: raise DownloadCanceled
headers= {'User-Agent': USER_AGENT_HEADER}
chunkLength = start
percentage = 0
itemName = f"[{track.mainArtist.name} - {track.title}]"
@ -109,32 +108,49 @@ def streamTrack(outputStream, track, start=0, downloadObject=None, interface=Non
if complete == 0: raise DownloadEmpty
if start != 0:
responseRange = request.headers["Content-Range"]
logger.info('%s downloading range %s', itemName, responseRange)
if listener:
listener.send('downloadInfo', {
'uuid': downloadObject.uuid,
'itemName': itemName,
'state': "downloading",
'alreadyStarted': True,
'value': responseRange
})
else:
logger.info('%s downloading %s bytes', itemName, complete)
if listener:
listener.send('downloadInfo', {
'uuid': downloadObject.uuid,
'itemName': itemName,
'state': "downloading",
'alreadyStarted': False,
'value': complete
})
for chunk in request.iter_content(2048 * 3):
if len(chunk) >= 2048:
chunk = Blowfish.new(blowfish_key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07").decrypt(chunk[0:2048]) + chunk[2048:]
chunk = decryptChunk(blowfish_key, chunk[0:2048]) + chunk[2048:]
outputStream.write(chunk)
chunkLength += len(chunk)
if downloadObject:
if isinstance(downloadObject, Single):
percentage = (chunkLength / (complete + start)) * 100
downloadObject.progressNext = percentage
chunkProgres = (chunkLength / (complete + start)) * 100
downloadObject.progressNext = chunkProgres
else:
chunkProgres = (len(chunk) / (complete + start)) / downloadObject.size * 100
downloadObject.progressNext += chunkProgres
downloadObject.updateProgress(interface)
downloadObject.updateProgress(listener)
except (SSLError, u3SSLError):
logger.info('%s retrying from byte %s', itemName, chunkLength)
streamTrack(outputStream, track, chunkLength, downloadObject, interface)
except (RequestsConnectionError, ReadTimeout):
streamCryptedTrack(outputStream, track, chunkLength, downloadObject, listener)
except (RequestsConnectionError, ReadTimeout, ChunkedEncodingError):
sleep(2)
streamTrack(outputStream, track, start, downloadObject, interface)
streamCryptedTrack(outputStream, track, start, downloadObject, listener)
class DownloadCanceled(Exception):
pass
class DownloadEmpty(Exception):
pass

View File

@ -18,19 +18,17 @@ from urllib3.exceptions import SSLError as u3SSLError
from mutagen.flac import FLACNoHeaderError, error as FLACError
from deezer import TrackFormats
from deemix import USER_AGENT_HEADER
from deemix.types.DownloadObjects import Single, Collection
from deemix.types.Track import Track, AlbumDoesntExists
from deemix.utils.pathtemplates import generateFilename, generateFilepath, settingsRegexAlbum, settingsRegexArtist, settingsRegexPlaylistFile
from deemix.taggers import tagID3, tagFLAC
from deemix.decryption import generateUnencryptedStreamURL, streamUnencryptedTrack
from deemix.types.Track import Track, AlbumDoesntExists, MD5NotFound
from deemix.types.Picture import StaticPicture
from deemix.utils import USER_AGENT_HEADER
from deemix.utils.pathtemplates import generatePath, generateAlbumName, generateArtistName, generateDownloadObjectName
from deemix.tagger import tagID3, tagFLAC
from deemix.decryption import generateStreamURL, streamTrack, DownloadCanceled
from deemix.settings import OverwriteOption
logger = logging.getLogger('deemix')
TEMPDIR = Path(gettempdir()) / 'deemix-imgs'
if not TEMPDIR.is_dir(): makedirs(TEMPDIR)
extensions = {
TrackFormats.FLAC: '.flac',
TrackFormats.LOCAL: '.mp3',
@ -42,52 +40,39 @@ extensions = {
TrackFormats.MP4_RA1: '.mp4'
}
errorMessages = {
'notOnDeezer': "Track not available on Deezer!",
'notEncoded': "Track not yet encoded!",
'notEncodedNoAlternative': "Track not yet encoded and no alternative found!",
'wrongBitrate': "Track not found at desired bitrate.",
'wrongBitrateNoAlternative': "Track not found at desired bitrate and no alternative found!",
'no360RA': "Track is not available in Reality Audio 360.",
'notAvailable': "Track not available on deezer's servers!",
'notAvailableNoAlternative': "Track not available on deezer's servers and no alternative found!",
'noSpaceLeft': "No space left on target drive, clean up some space for the tracks",
'albumDoesntExists': "Track's album does not exsist, failed to gather info"
}
TEMPDIR = Path(gettempdir()) / 'deemix-imgs'
if not TEMPDIR.is_dir(): makedirs(TEMPDIR)
def downloadImage(url, path, overwrite=OverwriteOption.DONT_OVERWRITE):
if not path.is_file() or overwrite in [OverwriteOption.OVERWRITE, OverwriteOption.ONLY_TAGS, OverwriteOption.KEEP_BOTH]:
try:
image = get(url, headers={'User-Agent': USER_AGENT_HEADER}, timeout=30)
image.raise_for_status()
with open(path, 'wb') as f:
f.write(image.content)
return path
except requests.exceptions.HTTPError:
if 'cdns-images.dzcdn.net' in url:
urlBase = url[:url.rfind("/")+1]
pictureUrl = url[len(urlBase):]
pictureSize = int(pictureUrl[:pictureUrl.find("x")])
if pictureSize > 1200:
logger.warning("Couldn't download %sx%s image, falling back to 1200x1200", pictureSize, pictureSize)
sleep(1)
return downloadImage(urlBase+pictureUrl.replace(str(pictureSize)+"x"+str(pictureSize), '1200x1200'), path, overwrite)
logger.error("Image not found: %s", url)
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError, u3SSLError) as e:
logger.error("Couldn't download Image, retrying in 5 seconds...: %s", url)
sleep(5)
return downloadImage(url, path, overwrite)
except OSError as e:
if e.errno == errno.ENOSPC: raise DownloadFailed("noSpaceLeft") from e
logger.exception("Error while downloading an image, you should report this to the developers: %s", e)
except Exception as e:
logger.exception("Error while downloading an image, you should report this to the developers: %s", e)
if path.is_file(): path.unlink()
return None
return path
if path.is_file() and overwrite not in [OverwriteOption.OVERWRITE, OverwriteOption.ONLY_TAGS, OverwriteOption.KEEP_BOTH]: return path
def getPreferredBitrate(track, preferredBitrate, shouldFallback, downloadObjectUUID=None, interface=None):
if track.localTrack: return TrackFormats.LOCAL
try:
image = get(url, headers={'User-Agent': USER_AGENT_HEADER}, timeout=30)
image.raise_for_status()
with open(path, 'wb') as f:
f.write(image.content)
return path
except requests.exceptions.HTTPError:
if path.is_file(): path.unlink()
if 'cdns-images.dzcdn.net' in url:
urlBase = url[:url.rfind("/")+1]
pictureUrl = url[len(urlBase):]
pictureSize = int(pictureUrl[:pictureUrl.find("x")])
if pictureSize > 1200:
return downloadImage(urlBase+pictureUrl.replace(f"{pictureSize}x{pictureSize}", '1200x1200'), path, overwrite)
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError, u3SSLError) as e:
if path.is_file(): path.unlink()
sleep(5)
return downloadImage(url, path, overwrite)
except OSError as e:
if path.is_file(): path.unlink()
if e.errno == errno.ENOSPC: raise DownloadFailed("noSpaceLeft") from e
logger.exception("Error while downloading an image, you should report this to the developers: %s", e)
return None
def getPreferredBitrate(track, bitrate, shouldFallback, uuid=None, listener=None):
bitrate = int(bitrate)
if track.local: return TrackFormats.LOCAL
falledBack = False
@ -102,7 +87,7 @@ def getPreferredBitrate(track, preferredBitrate, shouldFallback, downloadObjectU
TrackFormats.MP4_RA1: "MP4_RA1",
}
is360format = int(preferredBitrate) in formats_360
is360format = bitrate in formats_360.keys()
if not shouldFallback:
formats = formats_360
@ -112,30 +97,36 @@ def getPreferredBitrate(track, preferredBitrate, shouldFallback, downloadObjectU
else:
formats = formats_non_360
def testBitrate(track, formatNumber, formatName):
request = requests.head(
generateStreamURL(track.id, track.MD5, track.mediaVersion, formatNumber),
headers={'User-Agent': USER_AGENT_HEADER},
timeout=30
)
try:
request.raise_for_status()
track.filesizes[f"FILESIZE_{formatName}"] = request.headers["Content-Length"]
track.filesizes[f"FILESIZE_{formatName}_TESTED"] = True
return formatNumber
except requests.exceptions.HTTPError: # if the format is not available, Deezer returns a 403 error
return None
for formatNumber, formatName in formats.items():
if formatNumber >= int(preferredBitrate): continue
if formatNumber >= int(bitrate): continue
if f"FILESIZE_{formatName}" in track.filesizes:
if int(track.filesizes[f"FILESIZE_{formatName}"]) != 0: return formatNumber
if not track.filesizes[f"FILESIZE_{formatName}_TESTED"]:
request = requests.head(
generateUnencryptedStreamURL(track.id, track.MD5, track.mediaVersion, formatNumber),
headers={'User-Agent': USER_AGENT_HEADER},
timeout=30
)
try:
request.raise_for_status()
return formatNumber
except requests.exceptions.HTTPError: # if the format is not available, Deezer returns a 403 error
pass
testedBitrate = testBitrate(track, formatNumber, formatName)
if testedBitrate: return testedBitrate
if not shouldFallback:
raise PreferredBitrateNotFound
if not falledBack:
falledBack = True
logger.info("%s Fallback to lower bitrate", f"[{track.mainArtist.name} - {track.title}]")
if interface and downloadObjectUUID:
interface.send('queueUpdate', {
'uuid': downloadObjectUUID,
if listener and uuid:
listener.send('queueUpdate', {
'uuid': uuid,
'bitrateFallback': True,
'data': {
'id': track.id,
@ -147,32 +138,52 @@ def getPreferredBitrate(track, preferredBitrate, shouldFallback, downloadObjectU
return TrackFormats.DEFAULT
class Downloader:
def __init__(self, dz, downloadObject, settings, interface=None):
def __init__(self, dz, downloadObject, settings, listener=None):
self.dz = dz
self.downloadObject = downloadObject
self.settings = settings
self.bitrate = downloadObject.bitrate
self.interface = interface
self.listener = listener
self.extrasPath = None
self.playlistCoverName = None
self.playlistURLs = []
def start(self):
if self.downloadObject.isCanceled:
if self.listener:
self.listener.send('currentItemCancelled', self.downloadObject.uuid)
self.listener.send("removedFromQueue", self.downloadObject.uuid)
return
if isinstance(self.downloadObject, Single):
result = self.downloadWrapper(self.downloadObject.single['trackAPI_gw'], self.downloadObject.single['trackAPI'], self.downloadObject.single['albumAPI'])
if result: self.singleAfterDownload(result)
track = self.downloadWrapper({
'trackAPI_gw': self.downloadObject.single['trackAPI_gw'],
'trackAPI': self.downloadObject.single['trackAPI'],
'albumAPI': self.downloadObject.single['albumAPI']
})
if track: self.afterDownloadSingle(track)
elif isinstance(self.downloadObject, Collection):
tracks = [None] * len(self.downloadObject.collection['tracks_gw'])
with ThreadPoolExecutor(self.settings['queueConcurrency']) as executor:
for pos, track in enumerate(self.downloadObject.collection['tracks_gw'], start=0):
tracks[pos] = executor.submit(self.downloadWrapper, track, None, self.downloadObject.collection['albumAPI'], self.downloadObject.collection['playlistAPI'])
self.collectionAfterDownload(tracks)
if self.interface:
self.interface.send("finishDownload", self.downloadObject.uuid)
return self.extrasPath
tracks[pos] = executor.submit(self.downloadWrapper, {
'trackAPI_gw': track,
'albumAPI': self.downloadObject.collection['albumAPI'],
'playlistAPI': self.downloadObject.collection['playlistAPI']
})
self.afterDownloadCollection(tracks)
def download(self, trackAPI_gw, trackAPI=None, albumAPI=None, playlistAPI=None, track=None):
result = {}
if self.listener:
self.listener.send("finishDownload", self.downloadObject.uuid)
def download(self, extraData, track=None):
returnData = {}
trackAPI_gw = extraData['trackAPI_gw']
trackAPI = extraData['trackAPI']
albumAPI = extraData['albumAPI']
playlistAPI = extraData['playlistAPI']
if self.downloadObject.isCanceled: raise DownloadCanceled
if trackAPI_gw['SNG_ID'] == "0": raise DownloadFailed("notOnDeezer")
itemName = f"[{trackAPI_gw['ART_NAME']} - {trackAPI_gw['SNG_TITLE']}]"
@ -190,6 +201,8 @@ class Downloader:
)
except AlbumDoesntExists as e:
raise DownloadError('albumDoesntExists') from e
except MD5NotFound as e:
raise DownloadError('notLoggedIn') from e
itemName = f"[{track.mainArtist.name} - {track.title}]"
@ -202,36 +215,37 @@ class Downloader:
track,
self.bitrate,
self.settings['fallbackBitrate'],
self.downloadObject.uuid, self.interface
self.downloadObject.uuid, self.listener
)
except PreferredBitrateNotFound as e:
raise DownloadFailed("wrongBitrate", track) from e
except TrackNot360 as e:
raise DownloadFailed("no360RA") from e
track.selectedFormat = selectedFormat
track.bitrate = selectedFormat
track.album.bitrate = selectedFormat
# Apply settings
track.applySettings(self.settings)
# Generate filename and filepath from metadata
(filename, filepath, artistPath, coverPath, extrasPath) = generatePath(track, self.downloadObject, self.settings)
# Make sure the filepath exists
makedirs(filepath, exist_ok=True)
extension = extensions[track.bitrate]
writepath = filepath / f"{filename}{extension}"
# Save extrasPath
if extrasPath and not self.extrasPath: self.extrasPath = extrasPath
# Generate covers URLs
embeddedImageFormat = f'jpg-{self.settings["jpegImageQuality"]}'
if self.settings['embeddedArtworkPNG']: embeddedImageFormat = 'png'
track.applySettings(self.settings, TEMPDIR, embeddedImageFormat)
# Generate filename and filepath from metadata
filename = generateFilename(track, self.settings, "%artist% - %title%")
(filepath, artistPath, coverPath, extrasPath) = generateFilepath(track, self.settings)
# Remove subfolders from filename and add it to filepath
if pathSep in filename:
tempPath = filename[:filename.rfind(pathSep)]
filepath = filepath / tempPath
filename = filename[filename.rfind(pathSep) + len(pathSep):]
# Make sure the filepath exists
makedirs(filepath, exist_ok=True)
writepath = filepath / f"{filename}{extensions[track.selectedFormat]}"
# Save extrasPath
if extrasPath:
if not self.extrasPath: self.extrasPath = extrasPath
result['filename'] = str(writepath)[len(str(extrasPath))+ len(pathSep):]
track.album.embeddedCoverURL = track.album.pic.getURL(self.settings['embeddedArtworkSize'], embeddedImageFormat)
ext = track.album.embeddedCoverURL[-4:]
if ext[0] != ".": ext = ".jpg" # Check for Spotify images
track.album.embeddedCoverPath = TEMPDIR / ((f"pl{track.playlist.id}" if track.album.isPlaylist else f"alb{track.album.id}") + f"_{self.settings['embeddedArtworkSize']}{ext}")
# Download and cache coverart
logger.info("%s Getting the album cover", itemName)
@ -239,48 +253,46 @@ class Downloader:
# Save local album art
if coverPath:
result['albumURLs'] = []
returnData['albumURLs'] = []
for pic_format in self.settings['localArtworkFormat'].split(","):
if pic_format in ["png","jpg"]:
extendedFormat = pic_format
if extendedFormat == "jpg": extendedFormat += f"-{self.settings['jpegImageQuality']}"
url = track.album.pic.generatePictureURL(self.settings['localArtworkSize'], extendedFormat)
if self.settings['tags']['savePlaylistAsCompilation'] \
and track.playlist \
and track.playlist.pic.staticUrl \
and not pic_format.startswith("jpg"):
url = track.album.pic.getURL(self.settings['localArtworkSize'], extendedFormat)
# Skip non deezer pictures at the wrong format
if isinstance(track.album.pic, StaticPicture) and pic_format != "jpg":
continue
result['albumURLs'].append({'url': url, 'ext': pic_format})
result['albumPath'] = coverPath
result['albumFilename'] = f"{settingsRegexAlbum(self.settings['coverImageTemplate'], track.album, self.settings, track.playlist)}"
returnData['albumURLs'].append({'url': url, 'ext': pic_format})
returnData['albumPath'] = coverPath
returnData['albumFilename'] = generateAlbumName(self.settings['coverImageTemplate'], track.album, self.settings, track.playlist)
# Save artist art
if artistPath:
result['artistURLs'] = []
returnData['artistURLs'] = []
for pic_format in self.settings['localArtworkFormat'].split(","):
if pic_format in ["png","jpg"]:
extendedFormat = pic_format
if extendedFormat == "jpg": extendedFormat += f"-{self.settings['jpegImageQuality']}"
url = track.album.mainArtist.pic.generatePictureURL(self.settings['localArtworkSize'], extendedFormat)
if track.album.mainArtist.pic.md5 == "" and not pic_format.startswith("jpg"): continue
result['artistURLs'].append({'url': url, 'ext': pic_format})
result['artistPath'] = artistPath
result['artistFilename'] = f"{settingsRegexArtist(self.settings['artistImageTemplate'], track.album.mainArtist, self.settings, rootArtist=track.album.rootArtist)}"
# Deezer doesn't support png artist images
if pic_format == "jpg":
extendedFormat = f"{pic_format}-{self.settings['jpegImageQuality']}"
url = track.album.mainArtist.pic.getURL(self.settings['localArtworkSize'], extendedFormat)
if track.album.mainArtist.pic.md5 == "": continue
returnData['artistURLs'].append({'url': url, 'ext': pic_format})
returnData['artistPath'] = artistPath
returnData['artistFilename'] = generateArtistName(self.settings['artistImageTemplate'], track.album.mainArtist, self.settings, rootArtist=track.album.rootArtist)
# Save playlist art
if track.playlist:
if self.playlistURLs == []:
if len(self.playlistURLs) == 0:
for pic_format in self.settings['localArtworkFormat'].split(","):
if pic_format in ["png","jpg"]:
extendedFormat = pic_format
if extendedFormat == "jpg": extendedFormat += f"-{self.settings['jpegImageQuality']}"
url = track.playlist.pic.generatePictureURL(self.settings['localArtworkSize'], extendedFormat)
if track.playlist.pic.staticUrl and not pic_format.startswith("jpg"): continue
url = track.playlist.pic.getURL(self.settings['localArtworkSize'], extendedFormat)
if isinstance(track.playlist.pic, StaticPicture) and pic_format != "jpg": continue
self.playlistURLs.append({'url': url, 'ext': pic_format})
if not self.playlistCoverName:
track.playlist.bitrate = selectedFormat
track.playlist.dateString = track.playlist.date.format(self.settings['dateFormat'])
self.playlistCoverName = f"{settingsRegexAlbum(self.settings['coverImageTemplate'], track.playlist, self.settings, track.playlist)}"
self.playlistCoverName = generateAlbumName(self.settings['coverImageTemplate'], track.playlist, self.settings, track.playlist)
# Save lyrics in lrc file
if self.settings['syncedLyrics'] and track.lyrics.sync:
@ -301,106 +313,67 @@ class Downloader:
# Don't overwrite and keep both files
if trackAlreadyDownloaded and self.settings['overwriteFile'] == OverwriteOption.KEEP_BOTH:
baseFilename = str(filepath / filename)
i = 1
currentFilename = baseFilename+' ('+str(i)+')'+ extensions[track.selectedFormat]
c = 1
currentFilename = baseFilename+' ('+str(c)+')'+ extension
while Path(currentFilename).is_file():
i += 1
currentFilename = baseFilename+' ('+str(i)+')'+ extensions[track.selectedFormat]
c += 1
currentFilename = baseFilename+' ('+str(c)+')'+ extension
trackAlreadyDownloaded = False
writepath = Path(currentFilename)
if not trackAlreadyDownloaded or self.settings['overwriteFile'] == OverwriteOption.OVERWRITE:
logger.info("%s Downloading the track", itemName)
track.downloadUrl = generateUnencryptedStreamURL(track.id, track.MD5, track.mediaVersion, track.selectedFormat)
def downloadMusic(track, trackAPI_gw):
try:
with open(writepath, 'wb') as stream:
streamUnencryptedTrack(stream, track, downloadObject=self.downloadObject, interface=self.interface)
except DownloadCancelled as e:
if writepath.is_file(): writepath.unlink()
raise e
except (requests.exceptions.HTTPError, DownloadEmpty) as e:
if writepath.is_file(): writepath.unlink()
if track.fallbackID != "0":
logger.warning("%s Track not available, using fallback id", itemName)
newTrack = self.dz.gw.get_track_with_fallback(track.fallbackID)
track.parseEssentialData(newTrack)
track.retriveFilesizes(self.dz)
return False
if not track.searched and self.settings['fallbackSearch']:
logger.warning("%s Track not available, searching for alternative", itemName)
searchedId = self.dz.api.get_track_id_from_metadata(track.mainArtist.name, track.title, track.album.title)
if searchedId != "0":
newTrack = self.dz.gw.get_track_with_fallback(searchedId)
track.parseEssentialData(newTrack)
track.retriveFilesizes(self.dz)
track.searched = True
if self.interface:
self.interface.send('queueUpdate', {
'uuid': self.downloadObject.uuid,
'searchFallback': True,
'data': {
'id': track.id,
'title': track.title,
'artist': track.mainArtist.name
},
})
return False
raise DownloadFailed("notAvailableNoAlternative") from e
raise DownloadFailed("notAvailable") from e
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError) as e:
if writepath.is_file(): writepath.unlink()
logger.warning("%s Error while downloading the track, trying again in 5s...", itemName)
sleep(5)
return downloadMusic(track, trackAPI_gw)
except OSError as e:
if writepath.is_file(): writepath.unlink()
if e.errno == errno.ENOSPC: raise DownloadFailed("noSpaceLeft") from e
logger.exception("%s Error while downloading the track, you should report this to the developers: %s", itemName, e)
raise e
except Exception as e:
if writepath.is_file(): writepath.unlink()
logger.exception("%s Error while downloading the track, you should report this to the developers: %s", itemName, e)
raise e
return True
track.downloadUrl = generateStreamURL(track.id, track.MD5, track.mediaVersion, track.bitrate)
try:
trackDownloaded = downloadMusic(track, trackAPI_gw)
except Exception as e:
with open(writepath, 'wb') as stream:
streamTrack(stream, track, downloadObject=self.downloadObject, listener=self.listener)
except OSError as e:
if writepath.is_file(): writepath.unlink()
if e.errno == errno.ENOSPC: raise DownloadFailed("noSpaceLeft") from e
raise e
if not trackDownloaded: return self.download(trackAPI_gw, track=track)
else:
logger.info("%s Skipping track as it's already downloaded", itemName)
self.downloadObject.completeTrackProgress(self.interface)
self.downloadObject.completeTrackProgress(self.listener)
# Adding tags
if (not trackAlreadyDownloaded or self.settings['overwriteFile'] in [OverwriteOption.ONLY_TAGS, OverwriteOption.OVERWRITE]) and not track.localTrack:
if (not trackAlreadyDownloaded or self.settings['overwriteFile'] in [OverwriteOption.ONLY_TAGS, OverwriteOption.OVERWRITE]) and not track.local:
logger.info("%s Applying tags to the track", itemName)
if track.selectedFormat in [TrackFormats.MP3_320, TrackFormats.MP3_128, TrackFormats.DEFAULT]:
if extension == '.mp3':
tagID3(writepath, track, self.settings['tags'])
elif track.selectedFormat == TrackFormats.FLAC:
elif extension == '.flac':
try:
tagFLAC(writepath, track, self.settings['tags'])
except (FLACNoHeaderError, FLACError):
if writepath.is_file(): writepath.unlink()
writepath.unlink()
logger.warning("%s Track not available in FLAC, falling back if necessary", itemName)
self.downloadObject.removeTrackProgress(self.interface)
self.downloadObject.removeTrackProgress(self.listener)
track.filesizes['FILESIZE_FLAC'] = "0"
track.filesizes['FILESIZE_FLAC_TESTED'] = True
return self.download(trackAPI_gw, track=track)
if track.searched: result['searched'] = f"{track.mainArtist.name} - {track.title}"
logger.info("%s Track download completed\n%s", itemName, writepath)
if track.searched: returnData['searched'] = True
self.downloadObject.downloaded += 1
self.downloadObject.files.append(str(writepath))
self.downloadObject.extrasPath = str(self.extrasPath)
if self.interface:
self.interface.send("updateQueue", {'uuid': self.downloadObject.uuid, 'downloaded': True, 'downloadPath': str(writepath), 'extrasPath': str(self.extrasPath)})
return result
logger.info("%s Track download completed\n%s", itemName, writepath)
if self.listener: self.listener.send("updateQueue", {
'uuid': self.downloadObject.uuid,
'downloaded': True,
'downloadPath': str(writepath),
'extrasPath': str(self.extrasPath)
})
returnData['filename'] = str(writepath)[len(str(extrasPath))+ len(pathSep):]
returnData['data'] = {
'id': track.id,
'title': track.title,
'artist': track.mainArtist.name
}
return returnData
def downloadWrapper(self, trackAPI_gw, trackAPI=None, albumAPI=None, playlistAPI=None, track=None):
def downloadWrapper(self, extraData, track=None):
trackAPI_gw = extraData['trackAPI_gw']
# Temp metadata to generate logs
tempTrack = {
'id': trackAPI_gw['SNG_ID'],
@ -413,7 +386,7 @@ class Downloader:
itemName = f"[{track.mainArtist.name} - {track.title}]"
try:
result = self.download(trackAPI_gw, trackAPI, albumAPI, playlistAPI, track)
result = self.download(extraData, track)
except DownloadFailed as error:
if error.track:
track = error.track
@ -422,7 +395,7 @@ class Downloader:
newTrack = self.dz.gw.get_track_with_fallback(track.fallbackID)
track.parseEssentialData(newTrack)
track.retriveFilesizes(self.dz)
return self.downloadWrapper(trackAPI_gw, trackAPI, albumAPI, playlistAPI, track)
return self.downloadWrapper(extraData, track)
if not track.searched and self.settings['fallbackSearch']:
logger.warning("%s %s Searching for alternative", itemName, error.message)
searchedId = self.dz.api.get_track_id_from_metadata(track.mainArtist.name, track.title, track.album.title)
@ -431,19 +404,18 @@ class Downloader:
track.parseEssentialData(newTrack)
track.retriveFilesizes(self.dz)
track.searched = True
if self.interface:
self.interface.send('queueUpdate', {
'uuid': self.downloadObject.uuid,
'searchFallback': True,
'data': {
'id': track.id,
'title': track.title,
'artist': track.mainArtist.name
},
})
return self.downloadWrapper(trackAPI_gw, trackAPI, albumAPI, playlistAPI, track)
error.errid += "NoAlternative"
error.message = errorMessages[error.errid]
if self.listener: self.listener.send('queueUpdate', {
'uuid': self.downloadObject.uuid,
'searchFallback': True,
'data': {
'id': track.id,
'title': track.title,
'artist': track.mainArtist.name
},
})
return self.downloadWrapper(extraData, track)
error.errid += "NoAlternative"
error.message = errorMessages[error.errid]
logger.error("%s %s", itemName, error.message)
result = {'error': {
'message': error.message,
@ -453,17 +425,17 @@ class Downloader:
except Exception as e:
logger.exception("%s %s", itemName, e)
result = {'error': {
'message': str(e),
'data': tempTrack
}}
'message': str(e),
'data': tempTrack
}}
if 'error' in result:
self.downloadObject.completeTrackProgress(self.interface)
self.downloadObject.completeTrackProgress(self.listener)
self.downloadObject.failed += 1
self.downloadObject.errors.append(result['error'])
if self.interface:
if self.listener:
error = result['error']
self.interface.send("updateQueue", {
self.listener.send("updateQueue", {
'uuid': self.downloadObject.uuid,
'failed': True,
'data': error['data'],
@ -472,61 +444,63 @@ class Downloader:
})
return result
def singleAfterDownload(self, result):
def afterDownloadSingle(self, track):
if not self.extrasPath: self.extrasPath = Path(self.settings['downloadLocation'])
# Save Album Cover
if self.settings['saveArtwork'] and 'albumPath' in result:
for image in result['albumURLs']:
downloadImage(image['url'], result['albumPath'] / f"{result['albumFilename']}.{image['ext']}", self.settings['overwriteFile'])
if self.settings['saveArtwork'] and 'albumPath' in track:
for image in track['albumURLs']:
downloadImage(image['url'], track['albumPath'] / f"{track['albumFilename']}.{image['ext']}", self.settings['overwriteFile'])
# Save Artist Artwork
if self.settings['saveArtworkArtist'] and 'artistPath' in result:
for image in result['artistURLs']:
downloadImage(image['url'], result['artistPath'] / f"{result['artistFilename']}.{image['ext']}", self.settings['overwriteFile'])
if self.settings['saveArtworkArtist'] and 'artistPath' in track:
for image in track['artistURLs']:
downloadImage(image['url'], track['artistPath'] / f"{track['artistFilename']}.{image['ext']}", self.settings['overwriteFile'])
# Create searched logfile
if self.settings['logSearched'] and 'searched' in result:
if self.settings['logSearched'] and 'searched' in track:
filename = f"{track.data.artist} - {track.data.title}"
with open(self.extrasPath / 'searched.txt', 'wb+') as f:
orig = f.read().decode('utf-8')
if not result['searched'] in orig:
if orig != "": orig += "\r\n"
orig += result['searched'] + "\r\n"
f.write(orig.encode('utf-8'))
searchedFile = f.read().decode('utf-8')
if not filename in searchedFile:
if searchedFile != "": searchedFile += "\r\n"
searchedFile += filename + "\r\n"
f.write(searchedFile.encode('utf-8'))
# Execute command after download
if self.settings['executeCommand'] != "":
execute(self.settings['executeCommand'].replace("%folder%", quote(str(self.extrasPath))).replace("%filename%", quote(result['filename'])), shell=True)
execute(self.settings['executeCommand'].replace("%folder%", quote(str(self.extrasPath))).replace("%filename%", quote(track['filename'])), shell=True)
def collectionAfterDownload(self, tracks):
def afterDownloadCollection(self, tracks):
if not self.extrasPath: self.extrasPath = Path(self.settings['downloadLocation'])
playlist = [None] * len(tracks)
errors = ""
searched = ""
for i in enumerate(tracks):
result = tracks[i].result()
if not result: return # Check if item is cancelled
for i, track in enumerate(tracks):
track = track.result()
if not track: return # Check if item is cancelled
# Log errors to file
if result.get('error'):
if not result['error'].get('data'): result['error']['data'] = {'id': "0", 'title': 'Unknown', 'artist': 'Unknown'}
errors += f"{result['error']['data']['id']} | {result['error']['data']['artist']} - {result['error']['data']['title']} | {result['error']['message']}\r\n"
if track.get('error'):
if not track['error'].get('data'): track['error']['data'] = {'id': "0", 'title': 'Unknown', 'artist': 'Unknown'}
errors += f"{track['error']['data']['id']} | {track['error']['data']['artist']} - {track['error']['data']['title']} | {track['error']['message']}\r\n"
# Log searched to file
if 'searched' in result: searched += result['searched'] + "\r\n"
if 'searched' in track: searched += track['searched'] + "\r\n"
# Save Album Cover
if self.settings['saveArtwork'] and 'albumPath' in result:
for image in result['albumURLs']:
downloadImage(image['url'], result['albumPath'] / f"{result['albumFilename']}.{image['ext']}", self.settings['overwriteFile'])
if self.settings['saveArtwork'] and 'albumPath' in track:
for image in track['albumURLs']:
downloadImage(image['url'], track['albumPath'] / f"{track['albumFilename']}.{image['ext']}", self.settings['overwriteFile'])
# Save Artist Artwork
if self.settings['saveArtworkArtist'] and 'artistPath' in result:
for image in result['artistURLs']:
downloadImage(image['url'], result['artistPath'] / f"{result['artistFilename']}.{image['ext']}", self.settings['overwriteFile'])
if self.settings['saveArtworkArtist'] and 'artistPath' in track:
for image in track['artistURLs']:
downloadImage(image['url'], track['artistPath'] / f"{track['artistFilename']}.{image['ext']}", self.settings['overwriteFile'])
# Save filename for playlist file
playlist[i] = result.get('filename', "")
playlist[i] = track.get('filename', "")
# Create errors logfile
if self.settings['logErrors'] and errors != "":
@ -545,7 +519,7 @@ class Downloader:
# Create M3U8 File
if self.settings['createM3U8File']:
filename = settingsRegexPlaylistFile(self.settings['playlistFilenameTemplate'], self.downloadObject, self.settings) or "playlist"
filename = generateDownloadObjectName(self.settings['playlistFilenameTemplate'], self.downloadObject, self.settings) or "playlist"
with open(self.extrasPath / f'{filename}.m3u8', 'wb') as f:
for line in playlist:
f.write((line + "\n").encode('utf-8'))
@ -557,6 +531,19 @@ class Downloader:
class DownloadError(Exception):
"""Base class for exceptions in this module."""
errorMessages = {
'notOnDeezer': "Track not available on Deezer!",
'notEncoded': "Track not yet encoded!",
'notEncodedNoAlternative': "Track not yet encoded and no alternative found!",
'wrongBitrate': "Track not found at desired bitrate.",
'wrongBitrateNoAlternative': "Track not found at desired bitrate and no alternative found!",
'no360RA': "Track is not available in Reality Audio 360.",
'notAvailable': "Track not available on deezer's servers!",
'notAvailableNoAlternative': "Track not available on deezer's servers and no alternative found!",
'noSpaceLeft': "No space left on target drive, clean up some space for the tracks",
'albumDoesntExists': "Track's album does not exsist, failed to gather info"
}
class DownloadFailed(DownloadError):
def __init__(self, errid, track=None):
super().__init__()
@ -564,12 +551,6 @@ class DownloadFailed(DownloadError):
self.message = errorMessages[self.errid]
self.track = track
class DownloadCancelled(DownloadError):
pass
class DownloadEmpty(DownloadError):
pass
class PreferredBitrateNotFound(DownloadError):
pass

View File

@ -1,46 +1,31 @@
import logging
from deemix.types.DownloadObjects import Single, Collection
from deezer.utils import map_user_playlist
from deezer.api import APIError
from deezer.gw import GWAPIError, LyricsStatus
from deezer.api import APIError
from deezer.utils import map_user_playlist
logger = logging.getLogger('deemix')
class GenerationError(Exception):
def __init__(self, link, message, errid=None):
super().__init__()
self.link = link
self.message = message
self.errid = errid
def toDict(self):
return {
'link': self.link,
'error': self.message,
'errid': self.errid
}
def generateTrackItem(dz, link_id, bitrate, trackAPI=None, albumAPI=None):
# Check if is an isrc: url
if str(link_id).startswith("isrc"):
try:
trackAPI = dz.api.get_track(link_id)
except APIError as e:
raise GenerationError("https://deezer.com/track/"+str(link_id), f"Wrong URL: {e}") from e
raise GenerationError(f"https://deezer.com/track/{link_id}", str(e)) from e
if 'id' in trackAPI and 'title' in trackAPI:
link_id = trackAPI['id']
else:
raise GenerationError("https://deezer.com/track/"+str(link_id), "Track ISRC is not available on deezer", "ISRCnotOnDeezer")
raise ISRCnotOnDeezer(f"https://deezer.com/track/{link_id}")
if not link_id.isdecimal(): raise InvalidID(f"https://deezer.com/track/{link_id}")
# Get essential track info
try:
trackAPI_gw = dz.gw.get_track_with_fallback(link_id)
except GWAPIError as e:
message = "Wrong URL"
# TODO: FIX
# if "DATA_ERROR" in e: message += f": {e['DATA_ERROR']}"
raise GenerationError("https://deezer.com/track/"+str(link_id), message) from e
raise GenerationError(f"https://deezer.com/track/{link_id}", str(e)) from e
title = trackAPI_gw['SNG_TITLE'].strip()
if trackAPI_gw.get('VERSION') and trackAPI_gw['VERSION'] not in trackAPI_gw['SNG_TITLE']:
@ -67,20 +52,24 @@ def generateAlbumItem(dz, link_id, bitrate, rootArtist=None):
try:
albumAPI = dz.api.get_album(link_id)
except APIError as e:
raise GenerationError("https://deezer.com/album/"+str(link_id), f"Wrong URL: {e}") from e
raise GenerationError(f"https://deezer.com/album/{link_id}", str(e)) from e
if str(link_id).startswith('upc'): link_id = albumAPI['id']
if not link_id.isdecimal(): raise InvalidID(f"https://deezer.com/album/{link_id}")
# Get extra info about album
# This saves extra api calls when downloading
albumAPI_gw = dz.gw.get_album(link_id)
albumAPI['nb_disk'] = albumAPI_gw['NUMBER_DISK']
albumAPI['copyright'] = albumAPI_gw['COPYRIGHT']
albumAPI['release_date'] = albumAPI_gw['PHYSICAL_RELEASE_DATE']
albumAPI['root_artist'] = rootArtist
# If the album is a single download as a track
if albumAPI['nb_tracks'] == 1:
return generateTrackItem(dz, albumAPI['tracks']['data'][0]['id'], bitrate, albumAPI=albumAPI)
if len(albumAPI['tracks']['data']):
return generateTrackItem(dz, albumAPI['tracks']['data'][0]['id'], bitrate, albumAPI=albumAPI)
raise GenerationError(f"https://deezer.com/album/{link_id}", "Single has no tracks.")
tracksArray = dz.gw.get_album_tracks(link_id)
@ -116,6 +105,7 @@ def generateAlbumItem(dz, link_id, bitrate, rootArtist=None):
def generatePlaylistItem(dz, link_id, bitrate, playlistAPI=None, playlistTracksAPI=None):
if not playlistAPI:
if not link_id.isdecimal(): raise InvalidID(f"https://deezer.com/playlist/{link_id}")
# Get essential playlist info
try:
playlistAPI = dz.api.get_playlist(link_id)
@ -127,15 +117,12 @@ def generatePlaylistItem(dz, link_id, bitrate, playlistAPI=None, playlistTracksA
userPlaylist = dz.gw.get_playlist_page(link_id)
playlistAPI = map_user_playlist(userPlaylist['DATA'])
except GWAPIError as e:
message = "Wrong URL"
# TODO: FIX
# if "DATA_ERROR" in e: message += f": {e['DATA_ERROR']}"
raise GenerationError("https://deezer.com/playlist/"+str(link_id), message) from e
raise GenerationError(f"https://deezer.com/playlist/{link_id}", str(e)) from e
# Check if private playlist and owner
if not playlistAPI.get('public', False) and playlistAPI['creator']['id'] != str(dz.current_user['id']):
logger.warning("You can't download others private playlists.")
raise GenerationError("https://deezer.com/playlist/"+str(link_id), "You can't download others private playlists.", "notYourPrivatePlaylist")
raise NotYourPrivatePlaylist(f"https://deezer.com/playlist/{link_id}")
if not playlistTracksAPI:
playlistTracksAPI = dz.gw.get_playlist_tracks(link_id)
@ -168,73 +155,82 @@ def generatePlaylistItem(dz, link_id, bitrate, playlistAPI=None, playlistTracksA
}
})
def generateArtistItem(dz, link_id, bitrate, interface=None):
def generateArtistItem(dz, link_id, bitrate, listener=None):
if not link_id.isdecimal(): raise InvalidID(f"https://deezer.com/artist/{link_id}")
# Get essential artist info
try:
artistAPI = dz.api.get_artist(link_id)
except APIError as e:
raise GenerationError("https://deezer.com/artist/"+str(link_id), f"Wrong URL: {e}") from e
raise GenerationError(f"https://deezer.com/artist/{link_id}", str(e)) from e
rootArtist = {
'id': artistAPI['id'],
'name': artistAPI['name']
'name': artistAPI['name'],
'picture_small': artistAPI['picture_small']
}
if interface: interface.send("startAddingArtist", rootArtist)
if listener: listener.send("startAddingArtist", rootArtist)
artistDiscographyAPI = dz.gw.get_artist_discography_tabs(link_id, 100)
allReleases = artistDiscographyAPI.pop('all', [])
albumList = []
for album in allReleases:
albumList.append(generateAlbumItem(dz, album['id'], bitrate, rootArtist=rootArtist))
try:
albumList.append(generateAlbumItem(dz, album['id'], bitrate, rootArtist=rootArtist))
except GenerationError as e:
logger.warning("Album %s has no data: %s", str(album['id']), str(e))
if interface: interface.send("finishAddingArtist", rootArtist)
if listener: listener.send("finishAddingArtist", rootArtist)
return albumList
def generateArtistDiscographyItem(dz, link_id, bitrate, interface=None):
def generateArtistDiscographyItem(dz, link_id, bitrate, listener=None):
if not link_id.isdecimal(): raise InvalidID(f"https://deezer.com/artist/{link_id}/discography")
# Get essential artist info
try:
artistAPI = dz.api.get_artist(link_id)
except APIError as e:
e = str(e)
raise GenerationError("https://deezer.com/artist/"+str(link_id)+"/discography", f"Wrong URL: {e}")
raise GenerationError(f"https://deezer.com/artist/{link_id}/discography", str(e)) from e
rootArtist = {
'id': artistAPI['id'],
'name': artistAPI['name']
'name': artistAPI['name'],
'picture_small': artistAPI['picture_small']
}
if interface: interface.send("startAddingArtist", rootArtist)
if listener: listener.send("startAddingArtist", rootArtist)
artistDiscographyAPI = dz.gw.get_artist_discography_tabs(link_id, 100)
artistDiscographyAPI.pop('all', None) # all contains albums and singles, so its all duplicates. This removes them
albumList = []
for releaseType in artistDiscographyAPI:
for album in artistDiscographyAPI[releaseType]:
albumList.append(generateAlbumItem(dz, album['id'], bitrate, rootArtist=rootArtist))
try:
albumList.append(generateAlbumItem(dz, album['id'], bitrate, rootArtist=rootArtist))
except GenerationError as e:
logger.warning("Album %s has no data: %s", str(album['id']), str(e))
if interface: interface.send("finishAddingArtist", rootArtist)
if listener: listener.send("finishAddingArtist", rootArtist)
return albumList
def generateArtistTopItem(dz, link_id, bitrate, interface=None):
def generateArtistTopItem(dz, link_id, bitrate):
if not link_id.isdecimal(): raise InvalidID(f"https://deezer.com/artist/{link_id}/top_track")
# Get essential artist info
try:
artistAPI = dz.api.get_artist(link_id)
except APIError as e:
e = str(e)
raise GenerationError("https://deezer.com/artist/"+str(link_id)+"/top_track", f"Wrong URL: {e}")
raise GenerationError(f"https://deezer.com/artist/{link_id}/top_track", str(e)) from e
# Emulate the creation of a playlist
# Can't use generatePlaylistItem directly as this is not a real playlist
playlistAPI = {
'id': str(artistAPI['id'])+"_top_track",
'title': artistAPI['name']+" - Top Tracks",
'description': "Top Tracks for "+artistAPI['name'],
'id':f"{artistAPI['id']}_top_track",
'title': f"{artistAPI['name']} - Top Tracks",
'description': f"Top Tracks for {artistAPI['name']}",
'duration': 0,
'public': True,
'is_loved_track': False,
'collaborative': False,
'nb_tracks': 0,
'fans': artistAPI['nb_fan'],
'link': "https://www.deezer.com/artist/"+str(artistAPI['id'])+"/top_track",
'link': f"https://www.deezer.com/artist/{artistAPI['id']}/top_track",
'share': None,
'picture': artistAPI['picture'],
'picture_small': artistAPI['picture_small'],
@ -242,10 +238,10 @@ def generateArtistTopItem(dz, link_id, bitrate, interface=None):
'picture_big': artistAPI['picture_big'],
'picture_xl': artistAPI['picture_xl'],
'checksum': None,
'tracklist': "https://api.deezer.com/artist/"+str(artistAPI['id'])+"/top",
'tracklist': f"https://api.deezer.com/artist/{artistAPI['id']}/top",
'creation_date': "XXXX-00-00",
'creator': {
'id': "art_"+str(artistAPI['id']),
'id': f"art_{artistAPI['id']}",
'name': artistAPI['name'],
'type': "user"
},
@ -254,3 +250,45 @@ def generateArtistTopItem(dz, link_id, bitrate, interface=None):
artistTopTracksAPI_gw = dz.gw.get_artist_toptracks(link_id)
return generatePlaylistItem(dz, playlistAPI['id'], bitrate, playlistAPI=playlistAPI, playlistTracksAPI=artistTopTracksAPI_gw)
class GenerationError(Exception):
def __init__(self, link, message, errid=None):
super().__init__()
self.link = link
self.message = message
self.errid = errid
def toDict(self):
return {
'link': self.link,
'error': self.message,
'errid': self.errid
}
class ISRCnotOnDeezer(GenerationError):
def __init__(self, link):
super().__init__(link, "Track ISRC is not available on deezer", "ISRCnotOnDeezer")
class NotYourPrivatePlaylist(GenerationError):
def __init__(self, link):
super().__init__(link, "You can't download others private playlists.", "notYourPrivatePlaylist")
class TrackNotOnDeezer(GenerationError):
def __init__(self, link):
super().__init__(link, "Track not found on deezer!", "trackNotOnDeezer")
class AlbumNotOnDeezer(GenerationError):
def __init__(self, link):
super().__init__(link, "Album not found on deezer!", "albumNotOnDeezer")
class InvalidID(GenerationError):
def __init__(self, link):
super().__init__(link, "Link ID is invalid!", "invalidID")
class LinkNotSupported(GenerationError):
def __init__(self, link):
super().__init__(link, "Link is not supported.", "unsupportedURL")
class LinkNotRecognized(GenerationError):
def __init__(self, link):
super().__init__(link, "Link is not recognized.", "invalidURL")

View File

@ -20,7 +20,7 @@ class FeaturesOption():
MOVE_TITLE = "2" # Move to track title
DEFAULTS = {
"downloadLocation": "",
"downloadLocation": localpaths.getMusicFolder(),
"tracknameTemplate": "%artist% - %title%",
"albumTracknameTemplate": "%tracknumber% - %title%",
"playlistTracknameTemplate": "%position% - %artist% - %title%",
@ -100,26 +100,26 @@ DEFAULTS = {
}
}
def saveSettings(settings, configFolder=None):
def save(settings, configFolder=None):
configFolder = Path(configFolder or localpaths.getConfigFolder())
makedirs(configFolder, exist_ok=True) # Create config folder if it doesn't exsist
with open(configFolder / 'config.json', 'w') as configFile:
json.dump(settings, configFile, indent=2)
def loadSettings(configFolder=None):
def load(configFolder=None):
configFolder = Path(configFolder or localpaths.getConfigFolder())
makedirs(configFolder, exist_ok=True) # Create config folder if it doesn't exsist
if not (configFolder / 'config.json').is_file(): saveSettings(DEFAULTS, configFolder) # Create config file if it doesn't exsist
if not (configFolder / 'config.json').is_file(): save(DEFAULTS, configFolder) # Create config file if it doesn't exsist
# Read config file
with open(configFolder / 'config.json', 'r') as configFile:
settings = json.load(configFile)
if checkSettings(settings) > 0: saveSettings(settings) # Check the settings and save them if something changed
if check(settings) > 0: save(settings, configFolder) # Check the settings and save them if something changed
return settings
def checkSettings(settings):
def check(settings):
changes = 0
for i_set in DEFAULTS:
if not i_set in settings or not isinstance(settings[i_set], DEFAULTS[i_set]):

View File

@ -4,10 +4,10 @@ from mutagen.id3 import ID3, ID3NoHeaderError, \
TPUB, TSRC, USLT, SYLT, APIC, IPLS, TCOM, TCOP, TCMP, Encoding, PictureType
# Adds tags to a MP3 file
def tagID3(stream, track, save):
def tagID3(path, track, save):
# Delete exsisting tags
try:
tag = ID3(stream)
tag = ID3(path)
tag.delete()
except ID3NoHeaderError:
tag = ID3()
@ -111,15 +111,15 @@ def tagID3(stream, track, save):
with open(track.album.embeddedCoverPath, 'rb') as f:
tag.add(APIC(descEncoding, mimeType, PictureType.COVER_FRONT, desc='cover', data=f.read()))
tag.save( stream,
tag.save( path,
v1=2 if save['saveID3v1'] else 0,
v2_version=3,
v23_sep=None if save['useNullSeparator'] else '/' )
# Adds tags to a FLAC file
def tagFLAC(stream, track, save):
def tagFLAC(path, track, save):
# Delete exsisting tags
tag = FLAC(stream)
tag = FLAC(path)
tag.delete()
tag.clear_pictures()

View File

@ -10,21 +10,21 @@ class Album:
def __init__(self, alb_id="0", title="", pic_md5=""):
self.id = alb_id
self.title = title
self.pic = Picture(md5=pic_md5, type="cover")
self.pic = Picture(pic_md5, "cover")
self.artist = {"Main": []}
self.artists = []
self.mainArtist = None
self.date = None
self.dateString = None
self.date = Date()
self.dateString = ""
self.trackTotal = "0"
self.discTotal = "0"
self.embeddedCoverPath = None
self.embeddedCoverURL = None
self.embeddedCoverPath = ""
self.embeddedCoverURL = ""
self.explicit = False
self.genre = []
self.barcode = "Unknown"
self.label = "Unknown"
self.copyright = None
self.copyright = ""
self.recordType = "album"
self.bitrate = 0
self.rootArtist = None
@ -32,26 +32,29 @@ class Album:
self.playlistId = None
self.owner = None
self.isPlaylist = False
def parseAlbum(self, albumAPI):
self.title = albumAPI['title']
# Getting artist image ID
# ex: https://e-cdns-images.dzcdn.net/images/artist/f2bc007e9133c946ac3c3907ddc5d2ea/56x56-000000-80-0-0.jpg
artistPicture = albumAPI['artist']['picture_small']
artistPicture = artistPicture[artistPicture.find('artist/') + 7:-24]
art_pic = albumAPI['artist']['picture_small']
art_pic = art_pic[art_pic.find('artist/') + 7:-24]
self.mainArtist = Artist(
id = albumAPI['artist']['id'],
name = albumAPI['artist']['name'],
pic_md5 = artistPicture
albumAPI['artist']['id'],
albumAPI['artist']['name'],
"Main",
art_pic
)
if albumAPI.get('root_artist'):
artistPicture = albumAPI['root_artist']['picture_small']
artistPicture = artistPicture[artistPicture.find('artist/') + 7:-24]
art_pic = albumAPI['root_artist']['picture_small']
art_pic = art_pic[art_pic.find('artist/') + 7:-24]
self.rootArtist = Artist(
id = albumAPI['root_artist']['id'],
name = albumAPI['root_artist']['name'],
pic_md5 = artistPicture
albumAPI['root_artist']['id'],
albumAPI['root_artist']['name'],
"Root",
art_pic
)
for artist in albumAPI['contributors']:
@ -60,7 +63,7 @@ class Album:
if isVariousArtists:
self.variousArtists = Artist(
id = artist['id'],
art_id = artist['id'],
name = artist['name'],
role = artist['role']
)
@ -81,10 +84,10 @@ class Album:
self.label = albumAPI.get('label', self.label)
self.explicit = bool(albumAPI.get('explicit_lyrics', False))
if 'release_date' in albumAPI:
day = albumAPI["release_date"][8:10]
month = albumAPI["release_date"][5:7]
year = albumAPI["release_date"][0:4]
self.date = Date(day, month, year)
self.date.day = albumAPI["release_date"][8:10]
self.date.month = albumAPI["release_date"][5:7]
self.date.year = albumAPI["release_date"][0:4]
self.date.fixDayMonth()
self.discTotal = albumAPI.get('nb_disk')
self.copyright = albumAPI.get('copyright')
@ -92,7 +95,8 @@ class Album:
if self.pic.md5 == "":
# Getting album cover MD5
# ex: https://e-cdns-images.dzcdn.net/images/cover/2e018122cb56986277102d2041a592c8/56x56-000000-80-0-0.jpg
self.pic.md5 = albumAPI['cover_small'][albumAPI['cover_small'].find('cover/') + 6:-24]
alb_pic = albumAPI['cover_small']
self.pic.md5 = alb_pic[alb_pic.find('cover/') + 6:-24]
if albumAPI.get('genres') and len(albumAPI['genres'].get('data', [])) > 0:
for genre in albumAPI['genres']['data']:
@ -101,8 +105,9 @@ class Album:
def parseAlbumGW(self, albumAPI_gw):
self.title = albumAPI_gw['ALB_TITLE']
self.mainArtist = Artist(
id = albumAPI_gw['ART_ID'],
name = albumAPI_gw['ART_NAME']
art_id = albumAPI_gw['ART_ID'],
name = albumAPI_gw['ART_NAME'],
role = "Main"
)
self.artists = [albumAPI_gw['ART_NAME']]
@ -113,13 +118,16 @@ class Album:
explicitLyricsStatus = albumAPI_gw.get('EXPLICIT_ALBUM_CONTENT', {}).get('EXPLICIT_LYRICS_STATUS', LyricsStatus.UNKNOWN)
self.explicit = explicitLyricsStatus in [LyricsStatus.EXPLICIT, LyricsStatus.PARTIALLY_EXPLICIT]
self.addExtraAlbumGWData(albumAPI_gw)
def addExtraAlbumGWData(self, albumAPI_gw):
if self.pic.md5 == "":
self.pic.md5 = albumAPI_gw['ALB_PICTURE']
if 'PHYSICAL_RELEASE_DATE' in albumAPI_gw:
day = albumAPI_gw["PHYSICAL_RELEASE_DATE"][8:10]
month = albumAPI_gw["PHYSICAL_RELEASE_DATE"][5:7]
year = albumAPI_gw["PHYSICAL_RELEASE_DATE"][0:4]
self.date = Date(day, month, year)
self.date.day = albumAPI_gw["PHYSICAL_RELEASE_DATE"][8:10]
self.date.month = albumAPI_gw["PHYSICAL_RELEASE_DATE"][5:7]
self.date.year = albumAPI_gw["PHYSICAL_RELEASE_DATE"][0:4]
self.date.fixDayMonth()
def makePlaylistCompilation(self, playlist):
self.variousArtists = playlist.variousArtists
@ -138,6 +146,7 @@ class Album:
self.playlistId = playlist.playlistId
self.owner = playlist.owner
self.pic = playlist.pic
self.isPlaylist = True
def removeDuplicateArtists(self):
"""Removes duplicate artists for both artist array and artists dict"""

View File

@ -5,7 +5,7 @@ class Artist:
def __init__(self, art_id="0", name="", role="", pic_md5=""):
self.id = str(art_id)
self.name = name
self.pic = Picture(md5=pic_md5, type="artist")
self.pic = Picture(md5=pic_md5, pic_type="artist")
self.role = role
self.save = True

View File

@ -1,8 +1,8 @@
class Date:
def __init__(self, day="00", month="00", year="XXXX"):
self.year = year
self.month = month
self.day = day
self.month = month
self.year = year
self.fixDayMonth()
# Fix incorrect day month when detectable

View File

@ -1,5 +1,5 @@
class IDownloadObject:
"""DownloadObject interface"""
"""DownloadObject Interface"""
def __init__(self, obj):
self.type = obj['type']
self.id = obj['id']
@ -16,7 +16,6 @@ class IDownloadObject:
self.files = obj.get('files', [])
self.progressNext = 0
self.uuid = f"{self.type}_{self.id}_{self.bitrate}"
self.ack = None
self.__type__ = None
def toDict(self):
@ -35,7 +34,6 @@ class IDownloadObject:
'progress': self.progress,
'errors': self.errors,
'files': self.files,
'ack': self.ack,
'__type__': self.__type__
}
@ -50,16 +48,29 @@ class IDownloadObject:
def getSlimmedDict(self):
light = self.toDict()
propertiesToDelete = ['single', 'collection', 'convertable']
propertiesToDelete = ['single', 'collection', 'plugin', 'conversion_data']
for prop in propertiesToDelete:
if prop in light:
del light[prop]
return light
def updateProgress(self, interface=None):
def getEssentialDict(self):
return {
'type': self.type,
'id': self.id,
'bitrate': self.bitrate,
'uuid': self.uuid,
'title': self.title,
'artist': self.artist,
'cover': self.cover,
'explicit': self.explicit,
'size': self.size
}
def updateProgress(self, listener=None):
if round(self.progressNext) != self.progress and round(self.progressNext) % 2 == 0:
self.progress = round(self.progressNext)
if interface: interface.send("updateQueue", {'uuid': self.uuid, 'progress': self.progress})
if listener: listener.send("updateQueue", {'uuid': self.uuid, 'progress': self.progress})
class Single(IDownloadObject):
def __init__(self, obj):
@ -73,13 +84,13 @@ class Single(IDownloadObject):
item['single'] = self.single
return item
def completeTrackProgress(self, interface=None):
def completeTrackProgress(self, listener=None):
self.progressNext = 100
self.updateProgress(interface)
self.updateProgress(listener)
def removeTrackProgress(self, interface=None):
def removeTrackProgress(self, listener=None):
self.progressNext = 0
self.updateProgress(interface)
self.updateProgress(listener)
class Collection(IDownloadObject):
def __init__(self, obj):
@ -92,13 +103,13 @@ class Collection(IDownloadObject):
item['collection'] = self.collection
return item
def completeTrackProgress(self, interface=None):
def completeTrackProgress(self, listener=None):
self.progressNext += (1 / self.size) * 100
self.updateProgress(interface)
self.updateProgress(listener)
def removeTrackProgress(self, interface=None):
def removeTrackProgress(self, listener=None):
self.progressNext -= (1 / self.size) * 100
self.updateProgress(interface)
self.updateProgress(listener)
class Convertable(Collection):
def __init__(self, obj):

View File

@ -19,6 +19,6 @@ class Lyrics:
else:
notEmptyLine = line + 1
while syncLyricsJson[notEmptyLine]["line"] == "":
notEmptyLine = notEmptyLine + 1
notEmptyLine += 1
timestamp = syncLyricsJson[notEmptyLine]["lrc_timestamp"]
self.sync += timestamp + syncLyricsJson[line]["line"] + "\r\n"

View File

@ -1,12 +1,9 @@
class Picture:
def __init__(self, md5="", pic_type="", url=None):
def __init__(self, md5="", pic_type=""):
self.md5 = md5
self.type = pic_type
self.staticUrl = url
def generatePictureURL(self, size, pic_format):
if self.staticUrl: return self.staticUrl
def getURL(self, size, pic_format):
url = "https://e-cdns-images.dzcdn.net/images/{}/{}/{size}x{size}".format(
self.type,
self.md5,
@ -23,3 +20,10 @@ class Picture:
return url + '-none-100-0-0.png'
return url+'.jpg'
class StaticPicture:
def __init__(self, url):
self.staticURL = url
def getURL(self):
return self.staticURL

View File

@ -1,6 +1,6 @@
from deemix.types.Artist import Artist
from deemix.types.Date import Date
from deemix.types.Picture import Picture
from deemix.types.Picture import Picture, StaticPicture
class Playlist:
def __init__(self, playlistAPI):
@ -30,20 +30,17 @@ class Playlist:
picType = url[url.find('images/')+7:]
picType = picType[:picType.find('/')]
md5 = url[url.find(picType+'/') + len(picType)+1:-24]
self.pic = Picture(
md5 = md5,
pic_type = picType
)
self.pic = Picture(md5, picType)
else:
self.pic = Picture(url = playlistAPI['picture_xl'])
self.pic = StaticPicture(playlistAPI['picture_xl'])
if 'various_artist' in playlistAPI:
pic_md5 = playlistAPI['various_artist']['picture_small']
pic_md5 = pic_md5[pic_md5.find('artist/') + 7:-24]
self.variousArtists = Artist(
art_id = playlistAPI['various_artist']['id'],
name = playlistAPI['various_artist']['name'],
role = "Main",
pic_md5 = pic_md5
playlistAPI['various_artist']['id'],
playlistAPI['various_artist']['name'],
"Main",
pic_md5
)
self.mainArtist = self.variousArtists

View File

@ -26,14 +26,14 @@ class Track:
self.duration = 0
self.fallbackID = "0"
self.filesizes = {}
self.localTrack = False
self.local = False
self.mainArtist = None
self.artist = {"Main": []}
self.artists = []
self.album = None
self.trackNumber = "0"
self.discNumber = "0"
self.date = None
self.date = Date()
self.lyrics = None
self.bpm = 0
self.contributors = {}
@ -64,7 +64,7 @@ class Track:
self.fallbackID = "0"
if 'FALLBACK' in trackAPI_gw:
self.fallbackID = trackAPI_gw['FALLBACK']['SNG_ID']
self.localTrack = int(self.id) < 0
self.local = int(self.id) < 0
def retriveFilesizes(self, dz):
guest_sid = dz.session.cookies.get('sid')
@ -87,8 +87,8 @@ class Track:
sleep(2)
self.retriveFilesizes(dz)
if len(result_json['error']):
raise APIError(result_json.dumps(result_json['error']))
response = result_json.get("results")
raise TrackError(result_json.dumps(result_json['error']))
response = result_json.get("results", {})
filesizes = {}
for key, value in response.items():
if key.startswith("FILESIZE_"):
@ -96,8 +96,8 @@ class Track:
filesizes[key+"_TESTED"] = False
self.filesizes = filesizes
def parseData(self, dz, id=None, trackAPI_gw=None, trackAPI=None, albumAPI_gw=None, albumAPI=None, playlistAPI=None):
if id and not trackAPI_gw: trackAPI_gw = dz.gw.get_track_with_fallback(id)
def parseData(self, dz, track_id=None, trackAPI_gw=None, trackAPI=None, albumAPI_gw=None, albumAPI=None, playlistAPI=None):
if track_id and not trackAPI_gw: trackAPI_gw = dz.gw.get_track_with_fallback(track_id)
elif not trackAPI_gw: raise NoDataToParse
if not trackAPI:
try: trackAPI = dz.api.get_track(trackAPI_gw['SNG_ID'])
@ -105,7 +105,7 @@ class Track:
self.parseEssentialData(trackAPI_gw, trackAPI)
if self.localTrack:
if self.local:
self.parseLocalTrackData(trackAPI_gw)
else:
self.retriveFilesizes(dz)
@ -147,6 +147,7 @@ class Track:
raise AlbumDoesntExists
# Fill missing data
if albumAPI_gw: self.album.addExtraAlbumGWData(albumAPI_gw)
if self.album.date and not self.date: self.date = self.album.date
if not self.album.discTotal: self.album.discTotal = albumAPI_gw.get('NUMBER_DISK', "1")
if not self.copyright: self.copyright = albumAPI_gw['COPYRIGHT']
@ -157,10 +158,9 @@ class Track:
self.title = ' '.join(self.title.split())
# Make sure there is at least one artist
if not len(self.artist['Main']):
if len(self.artist['Main']) == 0:
self.artist['Main'] = [self.mainArtist['name']]
self.singleDownload = trackAPI_gw.get('SINGLE_TRACK', False) # TODO: Change
self.position = trackAPI_gw.get('POSITION')
# Add playlist data if track is in a playlist
@ -178,12 +178,11 @@ class Track:
md5 = trackAPI_gw.get('ALB_PICTURE', ""),
pic_type = "cover"
)
self.mainArtist = Artist(name=trackAPI_gw['ART_NAME'])
self.mainArtist = Artist(name=trackAPI_gw['ART_NAME'], role="Main")
self.artists = [trackAPI_gw['ART_NAME']]
self.artist = {
'Main': [trackAPI_gw['ART_NAME']]
}
self.date = Date()
self.album.artist = self.artist
self.album.artists = self.artists
self.album.date = self.date
@ -207,14 +206,15 @@ class Track:
self.mainArtist = Artist(
art_id = trackAPI_gw['ART_ID'],
name = trackAPI_gw['ART_NAME'],
role = "Main",
pic_md5 = trackAPI_gw.get('ART_PICTURE')
)
if 'PHYSICAL_RELEASE_DATE' in trackAPI_gw:
day = trackAPI_gw["PHYSICAL_RELEASE_DATE"][8:10]
month = trackAPI_gw["PHYSICAL_RELEASE_DATE"][5:7]
year = trackAPI_gw["PHYSICAL_RELEASE_DATE"][0:4]
self.date = Date(day, month, year)
self.date.day = trackAPI_gw["PHYSICAL_RELEASE_DATE"][8:10]
self.date.month = trackAPI_gw["PHYSICAL_RELEASE_DATE"][5:7]
self.date.year = trackAPI_gw["PHYSICAL_RELEASE_DATE"][0:4]
self.date.fixDayMonth()
def parseTrack(self, trackAPI):
self.bpm = trackAPI['bpm']
@ -249,7 +249,7 @@ class Track:
return removeFeatures(self.title)
def getFeatTitle(self):
if self.featArtistsString and not "feat." in self.title.lower():
if self.featArtistsString and "feat." not in self.title.lower():
return f"{self.title} ({self.featArtistsString})"
return self.title
@ -259,26 +259,15 @@ class Track:
if 'Featured' in self.artist:
self.featArtistsString = "feat. "+andCommaConcat(self.artist['Featured'])
def applySettings(self, settings, TEMPDIR, embeddedImageFormat):
def applySettings(self, settings):
# Check if should save the playlist as a compilation
if self.playlist and settings['tags']['savePlaylistAsCompilation']:
self.trackNumber = self.position
self.discNumber = "1"
self.album.makePlaylistCompilation(self.playlist)
self.album.embeddedCoverURL = self.playlist.pic.generatePictureURL(settings['embeddedArtworkSize'], embeddedImageFormat)
ext = self.album.embeddedCoverURL[-4:]
if ext[0] != ".": ext = ".jpg" # Check for Spotify images
# TODO: FIX
# self.album.embeddedCoverPath = TEMPDIR / f"pl{trackAPI_gw['_EXTRA_PLAYLIST']['id']}_{settings['embeddedArtworkSize']}{ext}"
else:
if self.album.date: self.date = self.album.date
self.album.embeddedCoverURL = self.album.pic.generatePictureURL(settings['embeddedArtworkSize'], embeddedImageFormat)
ext = self.album.embeddedCoverURL[-4:]
self.album.embeddedCoverPath = TEMPDIR / f"alb{self.album.id}_{settings['embeddedArtworkSize']}{ext}"
self.dateString = self.date.format(settings['dateFormat'])
self.album.dateString = self.album.date.format(settings['dateFormat'])
@ -311,9 +300,8 @@ class Track:
self.album.title = self.album.getCleanTitle()
# Remove (Album Version) from tracks that have that
if settings['removeAlbumVersion']:
if "Album Version" in self.title:
self.title = re.sub(r' ?\(Album Version\)', "", self.title).strip()
if settings['removeAlbumVersion'] and "Album Version" in self.title:
self.title = re.sub(r' ?\(Album Version\)', "", self.title).strip()
# Change Title and Artists casing if needed
if settings['titleCasing'] != "nothing":

View File

@ -2,6 +2,12 @@ import string
from deezer import TrackFormats
import os
USER_AGENT_HEADER = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/79.0.3945.130 Safari/537.36"
def canWrite(folder):
return os.access(folder, os.W_OK)
def generateReplayGainString(trackGain):
return "{0:.2f} dB".format((float(trackGain) + 18.4) * -1)
@ -67,11 +73,3 @@ def removeDuplicateArtists(artist, artists):
for role in artist.keys():
artist[role] = uniqueArray(artist[role])
return (artist, artists)
def checkFolder(folder):
try:
os.makedirs(folder, exist_ok=True)
except Exception as e:
print(str(e))
return False
return os.access(folder, os.W_OK)

26
deemix/utils/crypto.py Normal file
View File

@ -0,0 +1,26 @@
import binascii
from Cryptodome.Cipher import Blowfish, AES
from Cryptodome.Hash import MD5
def _md5(data):
h = MD5.new()
h.update(data.encode() if isinstance(data, str) else data)
return h.hexdigest()
def _ecbCrypt(key, data):
return binascii.hexlify(AES.new(key.encode(), AES.MODE_ECB).encrypt(data))
def _ecbDecrypt(key, data):
return AES.new(key.encode(), AES.MODE_ECB).decrypt(binascii.unhexlify(data.encode("utf-8")))
def generateBlowfishKey(trackId):
SECRET = 'g4el58wc0zvf9na1'
idMd5 = _md5(trackId)
bfKey = ""
for i in range(16):
bfKey += chr(ord(idMd5[i]) ^ ord(idMd5[i + 16]) ^ ord(SECRET[i]))
return bfKey
def decryptChunk(key, data):
return Blowfish.new(key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07").decrypt(data)

32
deemix/utils/deezer.py Normal file
View File

@ -0,0 +1,32 @@
import requests
from deemix.utils.crypto import _md5
from deemix.utils import USER_AGENT_HEADER
CLIENT_ID = "172365"
CLIENT_SECRET = "fb0bec7ccc063dab0417eb7b0d847f34"
def getAccessToken(email, password):
password = _md5(password)
request_hash = _md5(''.join([CLIENT_ID, email, password, CLIENT_SECRET]))
response = requests.get(
'https://api.deezer.com/auth/token',
params={
'app_id': CLIENT_ID,
'login': email,
'password': password,
'hash': request_hash
},
headers={"User-Agent": USER_AGENT_HEADER}
).json()
return response.get('access_token')
def getArtFromAccessToken(accessToken):
session = requests.Session()
session.get(
"https://api.deezer.com/platform/generic/track/3135556",
headers={"Authorization": f"Bearer {accessToken}", "User-Agent": USER_AGENT_HEADER}
)
response = session.get(
'https://www.deezer.com/ajax/gw-light.php?method=user.getArl&input=3&api_version=1.0&api_token=null',
headers={"User-Agent": USER_AGENT_HEADER}
).json()
return response.get('results')

View File

@ -1,45 +1,72 @@
from pathlib import Path
import sys
import os
if os.name == 'nt':
import winreg # pylint: disable=E0401
import re
from deemix.utils import canWrite
homedata = Path.home()
userdata = ""
musicdata = ""
if os.getenv("DEEMIX_DATA_DIR"):
userdata = Path(os.getenv("DEEMIX_DATA_DIR"))
elif os.getenv("XDG_CONFIG_HOME"):
userdata = Path(os.getenv("XDG_CONFIG_HOME")) / 'deemix'
elif os.getenv("APPDATA"):
userdata = Path(os.getenv("APPDATA")) / "deemix"
elif sys.platform.startswith('darwin'):
userdata = homedata / 'Library' / 'Application Support' / 'deemix'
else:
userdata = homedata / '.config' / 'deemix'
if os.getenv("DEEMIX_MUSIC_DIR"):
musicdata = Path(os.getenv("DEEMIX_MUSIC_DIR"))
elif os.getenv("XDG_MUSIC_DIR"):
musicdata = Path(os.getenv("XDG_MUSIC_DIR")) / "deemix Music"
elif os.name == 'nt':
sub_key = r'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders'
music_guid = '{4BD8D571-6D19-48D3-BE97-422220080E43}'
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, sub_key) as key:
location = None
try: location = winreg.QueryValueEx(key, music_guid)[0]
except: pass
try: location = winreg.QueryValueEx(key, 'My Music')[0]
except: pass
if not location: location = homedata / "Music"
musicdata = Path(location) / "deemix Music"
else:
musicdata = homedata / "Music" / "deemix Music"
def checkPath(path):
if path == "": return ""
if not path.is_dir(): return ""
if not canWrite(path): return ""
return path
def getConfigFolder():
global userdata
if userdata != "": return userdata
if os.getenv("XDG_CONFIG_HOME") and userdata == "":
userdata = Path(os.getenv("XDG_CONFIG_HOME"))
userdata = checkPath(userdata)
if os.getenv("APPDATA") and userdata == "":
userdata = Path(os.getenv("APPDATA"))
userdata = checkPath(userdata)
if sys.platform.startswith('darwin') and userdata == "":
userdata = homedata / 'Library' / 'Application Support'
userdata = checkPath(userdata)
if userdata == "":
userdata = homedata / '.config'
userdata = checkPath(userdata)
if userdata == "": userdata = Path(os.getcwd()) / 'config'
else: userdata = userdata / 'deemix'
if os.getenv("DEEMIX_DATA_DIR"):
userdata = Path(os.getenv("DEEMIX_DATA_DIR"))
return userdata
def getMusicFolder():
global musicdata
if musicdata != "": return musicdata
if os.getenv("XDG_MUSIC_DIR") and musicdata == "":
musicdata = Path(os.getenv("XDG_MUSIC_DIR"))
musicdata = checkPath(musicdata)
if (homedata / '.config' / 'user-dirs.dirs').is_file() and musicdata == "":
with open(homedata / '.config' / 'user-dirs.dirs', 'r') as f:
userDirs = f.read()
musicdata = re.search(r"XDG_MUSIC_DIR=\"(.*)\"", userDirs).group(1)
musicdata = Path(os.path.expandvars(musicdata))
musicdata = checkPath(musicdata)
if os.name == 'nt' and musicdata == "":
musicKeys = ['My Music', '{4BD8D571-6D19-48D3-BE97-422220080E43}']
regData = os.popen(r'reg.exe query "HKCU\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders"').read().split('\r\n')
for i, line in enumerate(regData):
if line == "": continue
if i == 1: continue
line = line.split(' ')
if line[1] in musicKeys:
musicdata = Path(line[3])
break
musicdata = checkPath(musicdata)
if musicdata == "":
musicdata = homedata / 'Music'
musicdata = checkPath(musicdata)
if musicdata == "": musicdata = Path(os.getcwd()) / 'music'
else: musicdata = musicdata / 'deemix Music'
if os.getenv("DEEMIX_MUSIC_DIR"):
musicdata = Path(os.getenv("DEEMIX_MUSIC_DIR"))
return musicdata

View File

@ -21,14 +21,13 @@ def fixName(txt, char='_'):
txt = normalize("NFC", txt)
return txt
def fixEndOfData(bString):
try:
bString.decode()
return True
except:
return False
def fixLongName(name):
def fixEndOfData(bString):
try:
bString.decode()
return True
except Exception:
return False
if pathSep in name:
sepName = name.split(pathSep)
name = ""
@ -63,18 +62,29 @@ def pad(num, max_val, settings):
return str(num).zfill(paddingSize)
return str(num)
def generateFilename(track, settings, template):
filename = template or "%artist% - %title%"
return settingsRegex(filename, track, settings)
def generatePath(track, downloadObject, settings):
filenameTemplate = "%artist% - %title%"
singleTrack = False
if downloadObject.type == "track":
if settings['createSingleFolder']:
filenameTemplate = settings['albumTracknameTemplate']
else:
filenameTemplate = settings['tracknameTemplate']
singleTrack = True
elif downloadObject.type == "album":
filenameTemplate = settings['albumTracknameTemplate']
else:
filenameTemplate = settings['plyalistTracknameTemplate']
def generateFilepath(track, settings):
filepath = Path(settings['downloadLocation'])
filename = generateTrackName(filenameTemplate, track, settings)
filepath = Path(settings['downloadLocation'] or '.')
artistPath = None
coverPath = None
extrasPath = None
if settings['createPlaylistFolder'] and track.playlist and not settings['tags']['savePlaylistAsCompilation']:
filepath = filepath / settingsRegexPlaylist(settings['playlistNameTemplate'], track.playlist, settings)
filepath = filepath / generatePlaylistName(settings['playlistNameTemplate'], track.playlist, settings)
if track.playlist and not settings['tags']['savePlaylistAsCompilation']:
extrasPath = filepath
@ -84,61 +94,66 @@ def generateFilepath(track, settings):
(settings['createArtistFolder'] and track.playlist and settings['tags']['savePlaylistAsCompilation']) or
(settings['createArtistFolder'] and track.playlist and settings['createStructurePlaylist'])
):
filepath = filepath / settingsRegexArtist(settings['artistNameTemplate'], track.album.mainArtist, settings, rootArtist=track.album.rootArtist)
filepath = filepath / generateArtistName(settings['artistNameTemplate'], track.album.mainArtist, settings, rootArtist=track.album.rootArtist)
artistPath = filepath
if (settings['createAlbumFolder'] and
(not track.singleDownload or (track.singleDownload and settings['createSingleFolder'])) and
(not singleTrack or (singleTrack and settings['createSingleFolder'])) and
(not track.playlist or
(track.playlist and settings['tags']['savePlaylistAsCompilation']) or
(track.playlist and settings['createStructurePlaylist'])
)
):
filepath = filepath / settingsRegexAlbum(settings['albumNameTemplate'], track.album, settings, track.playlist)
filepath = filepath / generateAlbumName(settings['albumNameTemplate'], track.album, settings, track.playlist)
coverPath = filepath
if not (track.playlist and not settings['tags']['savePlaylistAsCompilation']):
extrasPath = filepath
if not extrasPath: extrasPath = filepath
if (
int(track.album.discTotal) > 1 and (
int(track.album.discTotal) > 1 and (
(settings['createAlbumFolder'] and settings['createCDFolder']) and
(not track.singleDownload or (track.singleDownload and settings['createSingleFolder'])) and
(not singleTrack or (singleTrack and settings['createSingleFolder'])) and
(not track.playlist or
(track.playlist and settings['tags']['savePlaylistAsCompilation']) or
(track.playlist and settings['createStructurePlaylist'])
)
)
)):
filepath = filepath / f'CD{str(track.discNumber)}'
filepath = filepath / f'CD{track.discNumber}'
return (filepath, artistPath, coverPath, extrasPath)
# Remove subfolders from filename and add it to filepath
if pathSep in filename:
tempPath = filename[:filename.rfind(pathSep)]
filepath = filepath / tempPath
filename = filename[filename.rfind(pathSep) + len(pathSep):]
return (filename, filepath, artistPath, coverPath, extrasPath)
def settingsRegex(filename, track, settings):
filename = filename.replace("%title%", fixName(track.title, settings['illegalCharacterReplacer']))
filename = filename.replace("%artist%", fixName(track.mainArtist.name, settings['illegalCharacterReplacer']))
filename = filename.replace("%artists%", fixName(", ".join(track.artists), settings['illegalCharacterReplacer']))
filename = filename.replace("%allartists%", fixName(track.artistsString, settings['illegalCharacterReplacer']))
filename = filename.replace("%mainartists%", fixName(track.mainArtistsString, settings['illegalCharacterReplacer']))
def generateTrackName(filename, track, settings):
c = settings['illegalCharacterReplacer']
filename = filename.replace("%title%", fixName(track.title, c))
filename = filename.replace("%artist%", fixName(track.mainArtist.name, c))
filename = filename.replace("%artists%", fixName(", ".join(track.artists), c))
filename = filename.replace("%allartists%", fixName(track.artistsString, c))
filename = filename.replace("%mainartists%", fixName(track.mainArtistsString, c))
if track.featArtistsString:
filename = filename.replace("%featartists%", fixName('('+track.featArtistsString+')', settings['illegalCharacterReplacer']))
filename = filename.replace("%featartists%", fixName('('+track.featArtistsString+')', c))
else:
filename = filename.replace("%featartists%", '')
filename = filename.replace("%album%", fixName(track.album.title, settings['illegalCharacterReplacer']))
filename = filename.replace("%albumartist%", fixName(track.album.mainArtist.name, settings['illegalCharacterReplacer']))
filename = filename.replace("%album%", fixName(track.album.title, c))
filename = filename.replace("%albumartist%", fixName(track.album.mainArtist.name, c))
filename = filename.replace("%tracknumber%", pad(track.trackNumber, track.album.trackTotal, settings))
filename = filename.replace("%tracktotal%", str(track.album.trackTotal))
filename = filename.replace("%discnumber%", str(track.discNumber))
filename = filename.replace("%disctotal%", str(track.album.discTotal))
if len(track.album.genre) > 0:
filename = filename.replace("%genre%",
fixName(track.album.genre[0], settings['illegalCharacterReplacer']))
filename = filename.replace("%genre%", fixName(track.album.genre[0], c))
else:
filename = filename.replace("%genre%", "Unknown")
filename = filename.replace("%year%", str(track.date.year))
filename = filename.replace("%date%", track.dateString)
filename = filename.replace("%bpm%", str(track.bpm))
filename = filename.replace("%label%", fixName(track.album.label, settings['illegalCharacterReplacer']))
filename = filename.replace("%label%", fixName(track.album.label, c))
filename = filename.replace("%isrc%", track.ISRC)
filename = filename.replace("%upc%", track.album.barcode)
filename = filename.replace("%explicit%", "(Explicit)" if track.explicit else "")
@ -151,36 +166,37 @@ def settingsRegex(filename, track, settings):
filename = filename.replace("%position%", pad(track.position, track.playlist.trackTotal, settings))
else:
filename = filename.replace("%playlist_id%", '')
filename = filename.replace("%position%", pad(track.trackNumber, track.album.trackTotal, settings))
filename = filename.replace("%position%", pad(track.position, track.album.trackTotal, settings))
filename = filename.replace('\\', pathSep).replace('/', pathSep)
return antiDot(fixLongName(filename))
def settingsRegexAlbum(foldername, album, settings, playlist=None):
def generateAlbumName(foldername, album, settings, playlist=None):
c = settings['illegalCharacterReplacer']
if playlist and settings['tags']['savePlaylistAsCompilation']:
foldername = foldername.replace("%album_id%", "pl_" + str(playlist.playlistID))
foldername = foldername.replace("%genre%", "Compile")
else:
foldername = foldername.replace("%album_id%", str(album.id))
if len(album.genre) > 0:
foldername = foldername.replace("%genre%", fixName(album.genre[0], settings['illegalCharacterReplacer']))
foldername = foldername.replace("%genre%", fixName(album.genre[0], c))
else:
foldername = foldername.replace("%genre%", "Unknown")
foldername = foldername.replace("%album%", fixName(album.title, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%artist%", fixName(album.mainArtist.name, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%album%", fixName(album.title, c))
foldername = foldername.replace("%artist%", fixName(album.mainArtist.name, c))
foldername = foldername.replace("%artist_id%", str(album.mainArtist.id))
if album.rootArtist:
foldername = foldername.replace("%root_artist%", fixName(album.rootArtist.name, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%root_artist%", fixName(album.rootArtist.name, c))
foldername = foldername.replace("%root_artist_id%", str(album.rootArtist.id))
else:
foldername = foldername.replace("%root_artist%", fixName(album.mainArtist.name, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%root_artist%", fixName(album.mainArtist.name, c))
foldername = foldername.replace("%root_artist_id%", str(album.mainArtist.id))
foldername = foldername.replace("%tracktotal%", str(album.trackTotal))
foldername = foldername.replace("%disctotal%", str(album.discTotal))
foldername = foldername.replace("%type%", fixName(album.recordType.capitalize(), settings['illegalCharacterReplacer']))
foldername = foldername.replace("%type%", fixName(album.recordType.capitalize(), c))
foldername = foldername.replace("%upc%", album.barcode)
foldername = foldername.replace("%explicit%", "(Explicit)" if album.explicit else "")
foldername = foldername.replace("%label%", fixName(album.label, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%label%", fixName(album.label, c))
foldername = foldername.replace("%year%", str(album.date.year))
foldername = foldername.replace("%date%", album.dateString)
foldername = foldername.replace("%bitrate%", bitrateLabels[int(album.bitrate)])
@ -189,23 +205,25 @@ def settingsRegexAlbum(foldername, album, settings, playlist=None):
return antiDot(fixLongName(foldername))
def settingsRegexArtist(foldername, artist, settings, rootArtist=None):
foldername = foldername.replace("%artist%", fixName(artist.name, settings['illegalCharacterReplacer']))
def generateArtistName(foldername, artist, settings, rootArtist=None):
c = settings['illegalCharacterReplacer']
foldername = foldername.replace("%artist%", fixName(artist.name, c))
foldername = foldername.replace("%artist_id%", str(artist.id))
if rootArtist:
foldername = foldername.replace("%root_artist%", fixName(rootArtist.name, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%root_artist%", fixName(rootArtist.name, c))
foldername = foldername.replace("%root_artist_id%", str(rootArtist.id))
else:
foldername = foldername.replace("%root_artist%", fixName(artist.name, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%root_artist%", fixName(artist.name, c))
foldername = foldername.replace("%root_artist_id%", str(artist.id))
foldername = foldername.replace('\\', pathSep).replace('/', pathSep)
return antiDot(fixLongName(foldername))
def settingsRegexPlaylist(foldername, playlist, settings):
foldername = foldername.replace("%playlist%", fixName(playlist.title, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%playlist_id%", fixName(playlist.playlistID, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%owner%", fixName(playlist.owner['name'], settings['illegalCharacterReplacer']))
def generatePlaylistName(foldername, playlist, settings):
c = settings['illegalCharacterReplacer']
foldername = foldername.replace("%playlist%", fixName(playlist.title, c))
foldername = foldername.replace("%playlist_id%", fixName(playlist.playlistID, c))
foldername = foldername.replace("%owner%", fixName(playlist.owner['name'], c))
foldername = foldername.replace("%owner_id%", str(playlist.owner['id']))
foldername = foldername.replace("%year%", str(playlist.date.year))
foldername = foldername.replace("%date%", str(playlist.dateString))
@ -213,12 +231,13 @@ def settingsRegexPlaylist(foldername, playlist, settings):
foldername = foldername.replace('\\', pathSep).replace('/', pathSep)
return antiDot(fixLongName(foldername))
def settingsRegexPlaylistFile(foldername, queueItem, settings):
foldername = foldername.replace("%title%", fixName(queueItem.title, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%artist%", fixName(queueItem.artist, settings['illegalCharacterReplacer']))
def generateDownloadObjectName(foldername, queueItem, settings):
c = settings['illegalCharacterReplacer']
foldername = foldername.replace("%title%", fixName(queueItem.title, c))
foldername = foldername.replace("%artist%", fixName(queueItem.artist, c))
foldername = foldername.replace("%size%", str(queueItem.size))
foldername = foldername.replace("%type%", fixName(queueItem.type, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%id%", fixName(queueItem.id, settings['illegalCharacterReplacer']))
foldername = foldername.replace("%type%", fixName(queueItem.type, c))
foldername = foldername.replace("%id%", fixName(queueItem.id, c))
foldername = foldername.replace("%bitrate%", bitrateLabels[int(queueItem.bitrate)])
foldername = foldername.replace('\\', pathSep).replace('/', pathSep).replace(pathSep, settings['illegalCharacterReplacer'])
foldername = foldername.replace('\\', pathSep).replace('/', pathSep).replace(pathSep, c)
return antiDot(fixLongName(foldername))

View File

@ -7,7 +7,7 @@ README = (HERE / "README.md").read_text()
setup(
name="deemix",
version="2.0.16",
version="3.0.0",
description="A barebone deezer downloader library",
long_description=README,
long_description_content_type="text/markdown",