Total rework of the library (WIP)

This commit is contained in:
RemixDev 2021-03-19 14:31:32 +01:00
parent 0f733ceaaa
commit 5ee81ced44
No known key found for this signature in database
GPG Key ID: B33962B465BDB51C
23 changed files with 1178 additions and 1986 deletions

View File

@ -1,6 +1,63 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import re
from urllib.request import urlopen
from deemix.itemgen import generateTrackItem, generateAlbumItem, generatePlaylistItem, generateArtistItem, generateArtistDiscographyItem, generateArtistTopItem
__version__ = "2.0.16" __version__ = "2.0.16"
USER_AGENT_HEADER = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " \ USER_AGENT_HEADER = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/79.0.3945.130 Safari/537.36" "Chrome/79.0.3945.130 Safari/537.36"
VARIOUS_ARTISTS = "5080"
# Returns the Resolved URL, the Type and the ID
def parseLink(link):
if 'deezer.page.link' in link: link = urlopen(url).url # Resolve URL shortner
# Remove extra stuff
if '?' in link: link = link[:link.find('?')]
if '&' in link: link = link[:link.find('&')]
if link.endswith('/'): link = link[:-1] # Remove last slash if present
type = None
id = None
if not 'deezer' in link: return (link, type, id) # return if not a deezer link
if '/track' in link:
type = 'track'
id = link[link.rfind("/") + 1:]
elif '/playlist' in link:
type = 'playlist'
id = re.search("\/playlist\/(\d+)", link)[0]
elif '/album' in link:
type = 'album'
id = link[link.rfind("/") + 1:]
elif re.search("\/artist\/(\d+)\/top_track", link):
type = 'artist_top'
id = re.search("\/artist\/(\d+)\/top_track", link)[0]
elif re.search("\/artist\/(\d+)\/discography", link):
type = 'artist_discography'
id = re.search("\/artist\/(\d+)\/discography", link)[0]
elif '/artist' in link:
type = 'artist'
id = re.search("\/artist\/(\d+)", link)[0]
return (link, type, id)
def generateDownloadItem(dz, link, bitrate):
(link, type, id) = parseLink(link)
if type == None or id == None: return None
if type == "track":
return generateTrackItem(dz, id, bitrate)
elif type == "album":
return generateAlbumItem(dz, id, bitrate)
elif type == "playlist":
return generatePlaylistItem(dz, id, bitrate)
elif type == "artist":
return generateArtistItem(dz, id, bitrate)
elif type == "artist_discography":
return generateArtistDiscographyItem(dz, id, bitrate)
elif type == "artist_top":
return generateArtistTopItem(dz, id, bitrate)
return None

View File

@ -1,26 +1,65 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import click import click
from deemix.app.cli import cli
from pathlib import Path from pathlib import Path
from deezer import Deezer
from deezer import TrackFormats
from deemix import generateDownloadItem
from deemix.settings import loadSettings
from deemix.utils import getBitrateNumberFromText
import deemix.utils.localpaths as localpaths
from deemix.downloader import Downloader
@click.command() @click.command()
@click.option('--portable', is_flag=True, help='Creates the config folder in the same directory where the script is launched') @click.option('--portable', is_flag=True, help='Creates the config folder in the same directory where the script is launched')
@click.option('-b', '--bitrate', default=None, help='Overwrites the default bitrate selected') @click.option('-b', '--bitrate', default=None, help='Overwrites the default bitrate selected')
@click.option('-p', '--path', type=str, help='Downloads in the given folder') @click.option('-p', '--path', type=str, help='Downloads in the given folder')
@click.argument('url', nargs=-1, required=True) @click.argument('url', nargs=-1, required=True)
def download(url, bitrate, portable, path): def download(url, bitrate, portable, path):
# Check for local configFolder
localpath = Path('.') localpath = Path('.')
configFolder = localpath / 'config' if portable else None configFolder = localpath / 'config' if portable else localpaths.getConfigFolder()
settings = loadSettings(configFolder)
dz = Deezer(settings.get('tagsLanguage'))
def requestValidArl():
while True:
arl = input("Paste here your arl:")
if dz.login_via_arl(arl.strip()): break
return arl
if (configFolder / '.arl').is_file():
with open(configFolder / '.arl', 'r') as f:
arl = f.readline().rstrip("\n").strip()
if not dz.login_via_arl(arl): arl = requestValidArl()
else: arl = requestValidArl()
with open(configFolder / '.arl', 'w') as f:
f.write(arl)
def downloadLinks(url, bitrate=None):
if not bitrate: bitrate = settings.get("maxBitrate", TrackFormats.MP3_320)
links = []
for link in url:
if ';' in link:
for l in link.split(";"):
links.append(l)
else:
links.append(link)
for link in links:
downloadItem = generateDownloadItem(dz, link, bitrate)
Downloader(dz, downloadItem, settings).start()
if path is not None: if path is not None:
if path == '': path = '.' if path == '': path = '.'
path = Path(path) path = Path(path)
settings['downloadLocation'] = str(path)
app = cli(path, configFolder)
app.login()
url = list(url) url = list(url)
if bitrate: bitrate = getBitrateNumberFromText(bitrate)
# If first url is filepath readfile and use them as URLs
try: try:
isfile = Path(url[0]).is_file() isfile = Path(url[0]).is_file()
except: except:
@ -30,7 +69,7 @@ def download(url, bitrate, portable, path):
with open(filename) as f: with open(filename) as f:
url = f.readlines() url = f.readlines()
app.downloadLink(url, bitrate) downloadLinks(url, bitrate)
click.echo("All done!") click.echo("All done!")
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,11 +0,0 @@
from deezer import Deezer
from deemix.app.settings import Settings
from deemix.app.queuemanager import QueueManager
from deemix.app.spotifyhelper import SpotifyHelper
class deemix:
def __init__(self, configFolder=None, overwriteDownloadFolder=None):
self.set = Settings(configFolder, overwriteDownloadFolder=overwriteDownloadFolder)
self.dz = Deezer(self.set.settings.get('tagsLanguage'))
self.sp = SpotifyHelper(configFolder)
self.qm = QueueManager(self.dz, self.sp)

View File

@ -1,40 +0,0 @@
from pathlib import Path
from os import makedirs
from deemix.app import deemix
from deemix.utils import checkFolder
class cli(deemix):
def __init__(self, downloadpath, configFolder=None):
super().__init__(configFolder, overwriteDownloadFolder=downloadpath)
if downloadpath:
print("Using folder: "+self.set.settings['downloadLocation'])
def downloadLink(self, url, bitrate=None):
for link in url:
if ';' in link:
for l in link.split(";"):
self.qm.addToQueue(l, self.set.settings, bitrate)
else:
self.qm.addToQueue(link, self.set.settings, bitrate)
def requestValidArl(self):
while True:
arl = input("Paste here your arl:")
if self.dz.login_via_arl(arl):
break
return arl
def login(self):
configFolder = Path(self.set.configFolder)
if not configFolder.is_dir():
makedirs(configFolder, exist_ok=True)
if (configFolder / '.arl').is_file():
with open(configFolder / '.arl', 'r') as f:
arl = f.readline().rstrip("\n")
if not self.dz.login_via_arl(arl):
arl = self.requestValidArl()
else:
arl = self.requestValidArl()
with open(configFolder / '.arl', 'w') as f:
f.write(arl)

View File

@ -1,4 +0,0 @@
class MessageInterface:
def send(self, message, value=None):
"""Implement this class to process updates and messages from the core"""
pass

View File

@ -1,115 +0,0 @@
class QueueItem:
def __init__(self, id=None, bitrate=None, title=None, artist=None, cover=None, explicit=False, size=None, type=None, settings=None, queueItemDict=None):
if queueItemDict:
self.title = queueItemDict['title']
self.artist = queueItemDict['artist']
self.cover = queueItemDict['cover']
self.explicit = queueItemDict.get('explicit', False)
self.size = queueItemDict['size']
self.type = queueItemDict['type']
self.id = queueItemDict['id']
self.bitrate = queueItemDict['bitrate']
self.extrasPath = queueItemDict.get('extrasPath', '')
self.files = queueItemDict['files']
self.downloaded = queueItemDict['downloaded']
self.failed = queueItemDict['failed']
self.errors = queueItemDict['errors']
self.progress = queueItemDict['progress']
self.settings = queueItemDict.get('settings')
else:
self.title = title
self.artist = artist
self.cover = cover
self.explicit = explicit
self.size = size
self.type = type
self.id = id
self.bitrate = bitrate
self.extrasPath = None
self.files = []
self.settings = settings
self.downloaded = 0
self.failed = 0
self.errors = []
self.progress = 0
self.uuid = f"{self.type}_{self.id}_{self.bitrate}"
self.cancel = False
self.ack = None
def toDict(self):
return {
'title': self.title,
'artist': self.artist,
'cover': self.cover,
'explicit': self.explicit,
'size': self.size,
'extrasPath': self.extrasPath,
'files': self.files,
'downloaded': self.downloaded,
'failed': self.failed,
'errors': self.errors,
'progress': self.progress,
'type': self.type,
'id': self.id,
'bitrate': self.bitrate,
'uuid': self.uuid,
'ack': self.ack
}
def getResettedItem(self):
item = self.toDict()
item['downloaded'] = 0
item['failed'] = 0
item['progress'] = 0
item['errors'] = []
return item
def getSlimmedItem(self):
light = self.toDict()
propertiesToDelete = ['single', 'collection', '_EXTRA', 'settings']
for property in propertiesToDelete:
if property in light:
del light[property]
return light
class QISingle(QueueItem):
def __init__(self, id=None, bitrate=None, title=None, artist=None, cover=None, explicit=False, type=None, settings=None, single=None, queueItemDict=None):
if queueItemDict:
super().__init__(queueItemDict=queueItemDict)
self.single = queueItemDict['single']
else:
super().__init__(id, bitrate, title, artist, cover, explicit, 1, type, settings)
self.single = single
def toDict(self):
queueItem = super().toDict()
queueItem['single'] = self.single
return queueItem
class QICollection(QueueItem):
def __init__(self, id=None, bitrate=None, title=None, artist=None, cover=None, explicit=False, size=None, type=None, settings=None, collection=None, queueItemDict=None):
if queueItemDict:
super().__init__(queueItemDict=queueItemDict)
self.collection = queueItemDict['collection']
else:
super().__init__(id, bitrate, title, artist, cover, explicit, size, type, settings)
self.collection = collection
def toDict(self):
queueItem = super().toDict()
queueItem['collection'] = self.collection
return queueItem
class QIConvertable(QICollection):
def __init__(self, id=None, bitrate=None, title=None, artist=None, cover=None, explicit=False, size=None, type=None, settings=None, extra=None, queueItemDict=None):
if queueItemDict:
super().__init__(queueItemDict=queueItemDict)
self.extra = queueItemDict['_EXTRA']
else:
super().__init__(id, bitrate, title, artist, cover, explicit, size, type, settings, [])
self.extra = extra
def toDict(self):
queueItem = super().toDict()
queueItem['_EXTRA'] = self.extra
return queueItem

View File

@ -1,592 +0,0 @@
from deemix.app.downloadjob import DownloadJob
from deemix.utils import getIDFromLink, getTypeFromLink, getBitrateInt
from deezer import Deezer
from deezer.gw import APIError as gwAPIError, LyricsStatus
from deezer.api import APIError
from deezer.utils import map_user_playlist
from spotipy.exceptions import SpotifyException
from deemix.app.queueitem import QueueItem, QISingle, QICollection, QIConvertable
import logging
from pathlib import Path
import json
from os import remove
import uuid
from urllib.request import urlopen
import threading
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('deemix')
class QueueManager:
def __init__(self, deezerHelper=None, spotifyHelper=None):
self.queue = []
self.queueList = {}
self.queueComplete = []
self.currentItem = ""
self.dz = deezerHelper or Deezer()
self.sp = spotifyHelper
self.queueThread = None
def generateTrackQueueItem(self, id, settings, bitrate, trackAPI=None, albumAPI=None, dz=None):
if not dz: dz = self.dz
# Check if is an isrc: url
if str(id).startswith("isrc"):
try:
trackAPI = dz.api.get_track(id)
except APIError as e:
e = str(e)
return QueueError("https://deezer.com/track/"+str(id), f"Wrong URL: {e}")
if 'id' in trackAPI and 'title' in trackAPI:
id = trackAPI['id']
else:
return QueueError("https://deezer.com/track/"+str(id), "Track ISRC is not available on deezer", "ISRCnotOnDeezer")
# Get essential track info
try:
trackAPI_gw = dz.gw.get_track_with_fallback(id)
except gwAPIError as e:
e = str(e)
message = "Wrong URL"
if "DATA_ERROR" in e: message += f": {e['DATA_ERROR']}"
return QueueError("https://deezer.com/track/"+str(id), message)
if albumAPI: trackAPI_gw['_EXTRA_ALBUM'] = albumAPI
if trackAPI: trackAPI_gw['_EXTRA_TRACK'] = trackAPI
if settings['createSingleFolder']:
trackAPI_gw['FILENAME_TEMPLATE'] = settings['albumTracknameTemplate']
else:
trackAPI_gw['FILENAME_TEMPLATE'] = settings['tracknameTemplate']
trackAPI_gw['SINGLE_TRACK'] = True
title = trackAPI_gw['SNG_TITLE'].strip()
if trackAPI_gw.get('VERSION') and trackAPI_gw['VERSION'] not in trackAPI_gw['SNG_TITLE']:
title += f" {trackAPI_gw['VERSION']}".strip()
explicit = bool(int(trackAPI_gw.get('EXPLICIT_LYRICS', 0)))
return QISingle(
id=id,
bitrate=bitrate,
title=title,
artist=trackAPI_gw['ART_NAME'],
cover=f"https://e-cdns-images.dzcdn.net/images/cover/{trackAPI_gw['ALB_PICTURE']}/75x75-000000-80-0-0.jpg",
explicit=explicit,
type='track',
settings=settings,
single=trackAPI_gw,
)
def generateAlbumQueueItem(self, id, settings, bitrate, rootArtist=None, dz=None):
if not dz: dz = self.dz
# Get essential album info
try:
albumAPI = dz.api.get_album(id)
except APIError as e:
e = str(e)
return QueueError("https://deezer.com/album/"+str(id), f"Wrong URL: {e}")
if str(id).startswith('upc'): id = albumAPI['id']
# Get extra info about album
# This saves extra api calls when downloading
albumAPI_gw = dz.gw.get_album(id)
albumAPI['nb_disk'] = albumAPI_gw['NUMBER_DISK']
albumAPI['copyright'] = albumAPI_gw['COPYRIGHT']
albumAPI['root_artist'] = rootArtist
# If the album is a single download as a track
if albumAPI['nb_tracks'] == 1:
return self.generateTrackQueueItem(albumAPI['tracks']['data'][0]['id'], settings, bitrate, albumAPI=albumAPI, dz=dz)
tracksArray = dz.gw.get_album_tracks(id)
if albumAPI['cover_small'] != None:
cover = albumAPI['cover_small'][:-24] + '/75x75-000000-80-0-0.jpg'
else:
cover = f"https://e-cdns-images.dzcdn.net/images/cover/{albumAPI_gw['ALB_PICTURE']}/75x75-000000-80-0-0.jpg"
totalSize = len(tracksArray)
albumAPI['nb_tracks'] = totalSize
collection = []
for pos, trackAPI in enumerate(tracksArray, start=1):
trackAPI['_EXTRA_ALBUM'] = albumAPI
trackAPI['POSITION'] = pos
trackAPI['SIZE'] = totalSize
trackAPI['FILENAME_TEMPLATE'] = settings['albumTracknameTemplate']
collection.append(trackAPI)
explicit = albumAPI_gw.get('EXPLICIT_ALBUM_CONTENT', {}).get('EXPLICIT_LYRICS_STATUS', LyricsStatus.UNKNOWN) in [LyricsStatus.EXPLICIT, LyricsStatus.PARTIALLY_EXPLICIT]
return QICollection(
id=id,
bitrate=bitrate,
title=albumAPI['title'],
artist=albumAPI['artist']['name'],
cover=cover,
explicit=explicit,
size=totalSize,
type='album',
settings=settings,
collection=collection,
)
def generatePlaylistQueueItem(self, id, settings, bitrate, dz=None):
if not dz: dz = self.dz
# Get essential playlist info
try:
playlistAPI = dz.api.get_playlist(id)
except:
playlistAPI = None
# Fallback to gw api if the playlist is private
if not playlistAPI:
try:
userPlaylist = dz.gw.get_playlist_page(id)
playlistAPI = map_user_playlist(userPlaylist['DATA'])
except gwAPIError as e:
e = str(e)
message = "Wrong URL"
if "DATA_ERROR" in e:
message += f": {e['DATA_ERROR']}"
return QueueError("https://deezer.com/playlist/"+str(id), message)
# Check if private playlist and owner
if not playlistAPI.get('public', False) and playlistAPI['creator']['id'] != str(dz.current_user['id']):
logger.warning("You can't download others private playlists.")
return QueueError("https://deezer.com/playlist/"+str(id), "You can't download others private playlists.", "notYourPrivatePlaylist")
playlistTracksAPI = dz.gw.get_playlist_tracks(id)
playlistAPI['various_artist'] = dz.api.get_artist(5080) # Useful for save as compilation
totalSize = len(playlistTracksAPI)
playlistAPI['nb_tracks'] = totalSize
collection = []
for pos, trackAPI in enumerate(playlistTracksAPI, start=1):
if trackAPI.get('EXPLICIT_TRACK_CONTENT', {}).get('EXPLICIT_LYRICS_STATUS', LyricsStatus.UNKNOWN) in [LyricsStatus.EXPLICIT, LyricsStatus.PARTIALLY_EXPLICIT]:
playlistAPI['explicit'] = True
trackAPI['_EXTRA_PLAYLIST'] = playlistAPI
trackAPI['POSITION'] = pos
trackAPI['SIZE'] = totalSize
trackAPI['FILENAME_TEMPLATE'] = settings['playlistTracknameTemplate']
collection.append(trackAPI)
if not 'explicit' in playlistAPI:
playlistAPI['explicit'] = False
return QICollection(
id=id,
bitrate=bitrate,
title=playlistAPI['title'],
artist=playlistAPI['creator']['name'],
cover=playlistAPI['picture_small'][:-24] + '/75x75-000000-80-0-0.jpg',
explicit=playlistAPI['explicit'],
size=totalSize,
type='playlist',
settings=settings,
collection=collection,
)
def generateArtistQueueItem(self, id, settings, bitrate, dz=None, interface=None):
if not dz: dz = self.dz
# Get essential artist info
try:
artistAPI = dz.api.get_artist(id)
except APIError as e:
e = str(e)
return QueueError("https://deezer.com/artist/"+str(id), f"Wrong URL: {e}")
if interface: interface.send("startAddingArtist", {'name': artistAPI['name'], 'id': artistAPI['id']})
rootArtist = {
'id': artistAPI['id'],
'name': artistAPI['name']
}
artistDiscographyAPI = dz.gw.get_artist_discography_tabs(id, 100)
allReleases = artistDiscographyAPI.pop('all', [])
albumList = []
for album in allReleases:
albumList.append(self.generateAlbumQueueItem(album['id'], settings, bitrate, rootArtist=rootArtist, dz=dz))
if interface: interface.send("finishAddingArtist", {'name': artistAPI['name'], 'id': artistAPI['id']})
return albumList
def generateArtistDiscographyQueueItem(self, id, settings, bitrate, dz=None, interface=None):
if not dz: dz = self.dz
# Get essential artist info
try:
artistAPI = dz.api.get_artist(id)
except APIError as e:
e = str(e)
return QueueError("https://deezer.com/artist/"+str(id)+"/discography", f"Wrong URL: {e}")
if interface: interface.send("startAddingArtist", {'name': artistAPI['name'], 'id': artistAPI['id']})
rootArtist = {
'id': artistAPI['id'],
'name': artistAPI['name']
}
artistDiscographyAPI = dz.gw.get_artist_discography_tabs(id, 100)
artistDiscographyAPI.pop('all', None) # all contains albums and singles, so its all duplicates. This removes them
albumList = []
for type in artistDiscographyAPI:
for album in artistDiscographyAPI[type]:
albumList.append(self.generateAlbumQueueItem(album['id'], settings, bitrate, rootArtist=rootArtist, dz=dz))
if interface: interface.send("finishAddingArtist", {'name': artistAPI['name'], 'id': artistAPI['id']})
return albumList
def generateArtistTopQueueItem(self, id, settings, bitrate, dz=None, interface=None):
if not dz: dz = self.dz
# Get essential artist info
try:
artistAPI = dz.api.get_artist(id)
except APIError as e:
e = str(e)
return QueueError("https://deezer.com/artist/"+str(id)+"/top_track", f"Wrong URL: {e}")
# Emulate the creation of a playlist
# Can't use generatePlaylistQueueItem as this is not a real playlist
playlistAPI = {
'id': str(artistAPI['id'])+"_top_track",
'title': artistAPI['name']+" - Top Tracks",
'description': "Top Tracks for "+artistAPI['name'],
'duration': 0,
'public': True,
'is_loved_track': False,
'collaborative': False,
'nb_tracks': 0,
'fans': artistAPI['nb_fan'],
'link': "https://www.deezer.com/artist/"+str(artistAPI['id'])+"/top_track",
'share': None,
'picture': artistAPI['picture'],
'picture_small': artistAPI['picture_small'],
'picture_medium': artistAPI['picture_medium'],
'picture_big': artistAPI['picture_big'],
'picture_xl': artistAPI['picture_xl'],
'checksum': None,
'tracklist': "https://api.deezer.com/artist/"+str(artistAPI['id'])+"/top",
'creation_date': "XXXX-00-00",
'creator': {
'id': "art_"+str(artistAPI['id']),
'name': artistAPI['name'],
'type': "user"
},
'type': "playlist"
}
artistTopTracksAPI_gw = dz.gw.get_artist_toptracks(id)
playlistAPI['various_artist'] = dz.api.get_artist(5080) # Useful for save as compilation
totalSize = len(artistTopTracksAPI_gw)
playlistAPI['nb_tracks'] = totalSize
collection = []
for pos, trackAPI in enumerate(artistTopTracksAPI_gw, start=1):
if trackAPI.get('EXPLICIT_TRACK_CONTENT', {}).get('EXPLICIT_LYRICS_STATUS', LyricsStatus.UNKNOWN) in [LyricsStatus.EXPLICIT, LyricsStatus.PARTIALLY_EXPLICIT]:
playlistAPI['explicit'] = True
trackAPI['_EXTRA_PLAYLIST'] = playlistAPI
trackAPI['POSITION'] = pos
trackAPI['SIZE'] = totalSize
trackAPI['FILENAME_TEMPLATE'] = settings['playlistTracknameTemplate']
collection.append(trackAPI)
if not 'explicit' in playlistAPI:
playlistAPI['explicit'] = False
return QICollection(
id=id,
bitrate=bitrate,
title=playlistAPI['title'],
artist=playlistAPI['creator']['name'],
cover=playlistAPI['picture_small'][:-24] + '/75x75-000000-80-0-0.jpg',
explicit=playlistAPI['explicit'],
size=totalSize,
type='playlist',
settings=settings,
collection=collection,
)
def generateQueueItem(self, url, settings, bitrate=None, dz=None, interface=None):
if not dz: dz = self.dz
bitrate = getBitrateInt(bitrate) or settings['maxBitrate']
if 'deezer.page.link' in url: url = urlopen(url).url
if 'link.tospotify.com' in url: url = urlopen(url).url
type = getTypeFromLink(url)
id = getIDFromLink(url, type)
if type == None or id == None:
logger.warn("URL not recognized")
return QueueError(url, "URL not recognized", "invalidURL")
if type == "track":
return self.generateTrackQueueItem(id, settings, bitrate, dz=dz)
elif type == "album":
return self.generateAlbumQueueItem(id, settings, bitrate, dz=dz)
elif type == "playlist":
return self.generatePlaylistQueueItem(id, settings, bitrate, dz=dz)
elif type == "artist":
return self.generateArtistQueueItem(id, settings, bitrate, interface=interface, dz=dz)
elif type == "artistdiscography":
return self.generateArtistDiscographyQueueItem(id, settings, bitrate, interface=interface, dz=dz)
elif type == "artisttop":
return self.generateArtistTopQueueItem(id, settings, bitrate, interface=interface, dz=dz)
elif type.startswith("spotify") and self.sp:
if not self.sp.spotifyEnabled:
logger.warn("Spotify Features is not setted up correctly.")
return QueueError(url, "Spotify Features is not setted up correctly.", "spotifyDisabled")
if type == "spotifytrack":
try:
(track_id, trackAPI, _) = self.sp.get_trackid_spotify(dz, id, settings['fallbackSearch'])
except SpotifyException as e:
return QueueError(url, "Wrong URL: "+e.msg[e.msg.find('\n')+2:])
except Exception as e:
return QueueError(url, "Something went wrong: "+str(e))
if track_id != "0":
return self.generateTrackQueueItem(track_id, settings, bitrate, trackAPI=trackAPI, dz=dz)
else:
logger.warn("Track not found on deezer!")
return QueueError(url, "Track not found on deezer!", "trackNotOnDeezer")
elif type == "spotifyalbum":
try:
album_id = self.sp.get_albumid_spotify(dz, id)
except SpotifyException as e:
return QueueError(url, "Wrong URL: "+e.msg[e.msg.find('\n')+2:])
except Exception as e:
return QueueError(url, "Something went wrong: "+str(e))
if album_id != "0":
return self.generateAlbumQueueItem(album_id, settings, bitrate, dz=dz)
else:
logger.warn("Album not found on deezer!")
return QueueError(url, "Album not found on deezer!", "albumNotOnDeezer")
elif type == "spotifyplaylist":
try:
return self.sp.generate_playlist_queueitem(dz, id, bitrate, settings)
except SpotifyException as e:
return QueueError(url, "Wrong URL: "+e.msg[e.msg.find('\n')+2:])
except Exception as e:
return QueueError(url, "Something went wrong: "+str(e))
logger.warn("URL not supported yet")
return QueueError(url, "URL not supported yet", "unsupportedURL")
def addToQueue(self, url, settings, bitrate=None, dz=None, interface=None, ack=None):
if not dz: dz = self.dz
if not dz.logged_in:
if interface: interface.send("loginNeededToDownload")
return False
def parseLink(link):
link = link.strip()
if link == "": return False
logger.info("Generating queue item for: "+link)
item = self.generateQueueItem(link, settings, bitrate, interface=interface, dz=dz)
# Add ack to all items
if type(item) is list:
for i in item:
if isinstance(i, QueueItem):
i.ack = ack
elif isinstance(item, QueueItem):
item.ack = ack
return item
if type(url) is list:
queueItem = []
request_uuid = str(uuid.uuid4())
if interface: interface.send("startGeneratingItems", {'uuid': request_uuid, 'total': len(url)})
for link in url:
item = parseLink(link)
if not item: continue
if type(item) is list:
queueItem += item
else:
queueItem.append(item)
if interface: interface.send("finishGeneratingItems", {'uuid': request_uuid, 'total': len(queueItem)})
if not len(queueItem):
return False
else:
queueItem = parseLink(url)
if not queueItem:
return False
def processQueueItem(item, silent=False):
if isinstance(item, QueueError):
logger.error(f"[{item.link}] {item.message}")
if interface: interface.send("queueError", item.toDict())
return False
if item.uuid in list(self.queueList.keys()):
logger.warn(f"[{item.uuid}] Already in queue, will not be added again.")
if interface and not silent: interface.send("alreadyInQueue", {'uuid': item.uuid, 'title': item.title})
return False
self.queue.append(item.uuid)
self.queueList[item.uuid] = item
logger.info(f"[{item.uuid}] Added to queue.")
return True
if type(queueItem) is list:
slimmedItems = []
for item in queueItem:
if processQueueItem(item, silent=True):
slimmedItems.append(item.getSlimmedItem())
else:
continue
if not len(slimmedItems):
return False
if interface: interface.send("addedToQueue", slimmedItems)
else:
if processQueueItem(queueItem):
if interface: interface.send("addedToQueue", queueItem.getSlimmedItem())
else:
return False
self.startQueue(interface, dz)
return True
def nextItem(self, dz=None, interface=None):
if not dz: dz = self.dz
# Check that nothing is already downloading and
# that the queue is not empty
if self.currentItem != "" or not len(self.queue):
self.queueThread = None
return None
self.currentItem = self.queue.pop(0)
if isinstance(self.queueList[self.currentItem], QIConvertable) and self.queueList[self.currentItem].extra:
logger.info(f"[{self.currentItem}] Converting tracks to deezer.")
self.sp.convert_spotify_playlist(dz, self.queueList[self.currentItem], interface=interface)
logger.info(f"[{self.currentItem}] Tracks converted.")
if interface: interface.send("startDownload", self.currentItem)
logger.info(f"[{self.currentItem}] Started downloading.")
DownloadJob(dz, self.queueList[self.currentItem], interface).start()
if self.queueList[self.currentItem].cancel:
del self.queueList[self.currentItem]
else:
self.queueComplete.append(self.currentItem)
logger.info(f"[{self.currentItem}] Finished downloading.")
self.currentItem = ""
self.nextItem(dz, interface)
def getQueue(self):
return (self.queue, self.queueComplete, self.slimQueueList(), self.currentItem)
def saveQueue(self, configFolder):
if len(self.queueList) > 0:
if self.currentItem != "":
self.queue.insert(0, self.currentItem)
with open(Path(configFolder) / 'queue.json', 'w') as f:
json.dump({
'queue': self.queue,
'queueComplete': self.queueComplete,
'queueList': self.exportQueueList()
}, f)
def exportQueueList(self):
queueList = {}
for uuid in self.queueList:
if uuid in self.queue:
queueList[uuid] = self.queueList[uuid].getResettedItem()
else:
queueList[uuid] = self.queueList[uuid].toDict()
return queueList
def slimQueueList(self):
queueList = {}
for uuid in self.queueList:
queueList[uuid] = self.queueList[uuid].getSlimmedItem()
return queueList
def loadQueue(self, configFolder, settings, interface=None):
configFolder = Path(configFolder)
if (configFolder / 'queue.json').is_file() and not len(self.queue):
if interface: interface.send('restoringQueue')
with open(configFolder / 'queue.json', 'r') as f:
try:
qd = json.load(f)
except json.decoder.JSONDecodeError:
logger.warn("Saved queue is corrupted, resetting it")
qd = {
'queue': [],
'queueComplete': [],
'queueList': {}
}
remove(configFolder / 'queue.json')
self.restoreQueue(qd['queue'], qd['queueComplete'], qd['queueList'], settings)
if interface:
interface.send('init_downloadQueue', {
'queue': self.queue,
'queueComplete': self.queueComplete,
'queueList': self.slimQueueList(),
'restored': True
})
def startQueue(self, interface=None, dz=None):
if not dz: dz = self.dz
if dz.logged_in and not self.queueThread:
self.queueThread = threading.Thread(target=self.nextItem, args=(dz, interface))
self.queueThread.start()
def restoreQueue(self, queue, queueComplete, queueList, settings):
self.queue = queue
self.queueComplete = queueComplete
self.queueList = {}
for uuid in queueList:
if 'single' in queueList[uuid]:
self.queueList[uuid] = QISingle(queueItemDict = queueList[uuid])
if 'collection' in queueList[uuid]:
self.queueList[uuid] = QICollection(queueItemDict = queueList[uuid])
if '_EXTRA' in queueList[uuid]:
self.queueList[uuid] = QIConvertable(queueItemDict = queueList[uuid])
self.queueList[uuid].settings = settings
def removeFromQueue(self, uuid, interface=None):
if uuid == self.currentItem:
if interface: interface.send("cancellingCurrentItem", uuid)
self.queueList[uuid].cancel = True
return
if uuid in self.queue:
self.queue.remove(uuid)
elif uuid in self.queueComplete:
self.queueComplete.remove(uuid)
else:
return
del self.queueList[uuid]
if interface: interface.send("removedFromQueue", uuid)
def cancelAllDownloads(self, interface=None):
self.queue = []
self.queueComplete = []
if self.currentItem != "":
if interface: interface.send("cancellingCurrentItem", self.currentItem)
self.queueList[self.currentItem].cancel = True
for uuid in list(self.queueList.keys()):
if uuid != self.currentItem: del self.queueList[uuid]
if interface: interface.send("removedAllDownloads", self.currentItem)
def removeFinishedDownloads(self, interface=None):
for uuid in self.queueComplete:
del self.queueList[uuid]
self.queueComplete = []
if interface: interface.send("removedFinishedDownloads")
class QueueError:
def __init__(self, link, message, errid=None):
self.link = link
self.message = message
self.errid = errid
def toDict(self):
return {
'link': self.link,
'error': self.message,
'errid': self.errid
}

View File

@ -1,220 +0,0 @@
import json
from pathlib import Path
from os import makedirs, listdir
from deemix import __version__ as deemixVersion
from deezer import TrackFormats
from deemix.utils import checkFolder
import logging
import datetime
import platform
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('deemix')
import deemix.utils.localpaths as localpaths
class OverwriteOption():
"""Should the lib overwrite files?"""
OVERWRITE = 'y'
"""Yes, overwrite the file"""
DONT_OVERWRITE = 'n'
"""No, don't overwrite the file"""
DONT_CHECK_EXT = 'e'
"""No, and don't check for extensions"""
KEEP_BOTH = 'b'
"""No, and keep both files"""
ONLY_TAGS = 't'
"""Overwrite only the tags"""
class FeaturesOption():
"""What should I do with featured artists?"""
NO_CHANGE = "0"
"""Do nothing"""
REMOVE_TITLE = "1"
"""Remove from track title"""
REMOVE_TITLE_ALBUM = "3"
"""Remove from track title and album title"""
MOVE_TITLE = "2"
"""Move to track title"""
DEFAULT_SETTINGS = {
"downloadLocation": str(localpaths.getMusicFolder()),
"tracknameTemplate": "%artist% - %title%",
"albumTracknameTemplate": "%tracknumber% - %title%",
"playlistTracknameTemplate": "%position% - %artist% - %title%",
"createPlaylistFolder": True,
"playlistNameTemplate": "%playlist%",
"createArtistFolder": False,
"artistNameTemplate": "%artist%",
"createAlbumFolder": True,
"albumNameTemplate": "%artist% - %album%",
"createCDFolder": True,
"createStructurePlaylist": False,
"createSingleFolder": False,
"padTracks": True,
"paddingSize": "0",
"illegalCharacterReplacer": "_",
"queueConcurrency": 3,
"maxBitrate": str(TrackFormats.MP3_320),
"fallbackBitrate": True,
"fallbackSearch": False,
"logErrors": True,
"logSearched": False,
"saveDownloadQueue": False,
"overwriteFile": OverwriteOption.DONT_OVERWRITE,
"createM3U8File": False,
"playlistFilenameTemplate": "playlist",
"syncedLyrics": False,
"embeddedArtworkSize": 800,
"embeddedArtworkPNG": False,
"localArtworkSize": 1400,
"localArtworkFormat": "jpg",
"saveArtwork": True,
"coverImageTemplate": "cover",
"saveArtworkArtist": False,
"artistImageTemplate": "folder",
"jpegImageQuality": 80,
"dateFormat": "Y-M-D",
"albumVariousArtists": True,
"removeAlbumVersion": False,
"removeDuplicateArtists": False,
"tagsLanguage": "",
"featuredToTitle": FeaturesOption.NO_CHANGE,
"titleCasing": "nothing",
"artistCasing": "nothing",
"executeCommand": "",
"tags": {
"title": True,
"artist": True,
"album": True,
"cover": True,
"trackNumber": True,
"trackTotal": False,
"discNumber": True,
"discTotal": False,
"albumArtist": True,
"genre": True,
"year": True,
"date": True,
"explicit": False,
"isrc": True,
"length": True,
"barcode": True,
"bpm": True,
"replayGain": False,
"label": True,
"lyrics": False,
"syncedLyrics": False,
"copyright": False,
"composer": False,
"involvedPeople": False,
"source": False,
"savePlaylistAsCompilation": False,
"useNullSeparator": False,
"saveID3v1": True,
"multiArtistSeparator": "default",
"singleAlbumArtist": False,
"coverDescriptionUTF8": False
}
}
class Settings:
def __init__(self, configFolder=None, overwriteDownloadFolder=None):
self.settings = {}
self.configFolder = Path(configFolder or localpaths.getConfigFolder())
# Create config folder if it doesn't exsist
makedirs(self.configFolder, exist_ok=True)
# Create config file if it doesn't exsist
if not (self.configFolder / 'config.json').is_file():
with open(self.configFolder / 'config.json', 'w') as f:
json.dump(DEFAULT_SETTINGS, f, indent=2)
# Read config file
with open(self.configFolder / 'config.json', 'r') as configFile:
self.settings = json.load(configFile)
# Check for overwriteDownloadFolder
# This prevents the creation of the original download folder when
# using overwriteDownloadFolder
originalDownloadFolder = self.settings['downloadLocation']
if overwriteDownloadFolder:
overwriteDownloadFolder = str(overwriteDownloadFolder)
self.settings['downloadLocation'] = overwriteDownloadFolder
# Make sure the download path exsits, fallback to default
invalidDownloadFolder = False
if self.settings['downloadLocation'] == "" or not checkFolder(self.settings['downloadLocation']):
self.settings['downloadLocation'] = DEFAULT_SETTINGS['downloadLocation']
originalDownloadFolder = self.settings['downloadLocation']
invalidDownloadFolder = True
# Check the settings and save them if something changed
if self.settingsCheck() > 0 or invalidDownloadFolder:
makedirs(self.settings['downloadLocation'], exist_ok=True)
self.settings['downloadLocation'] = originalDownloadFolder # Prevents the saving of the overwritten path
self.saveSettings()
self.settings['downloadLocation'] = overwriteDownloadFolder or originalDownloadFolder # Restores the correct path
# LOGFILES
# Create logfile name and path
logspath = self.configFolder / 'logs'
now = datetime.datetime.now()
logfile = now.strftime("%Y-%m-%d_%H%M%S")+".log"
makedirs(logspath, exist_ok=True)
# Add handler for logging
fh = logging.FileHandler(logspath / logfile, 'w', 'utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter('%(asctime)s - [%(levelname)s] %(message)s'))
logger.addHandler(fh)
logger.info(f"{platform.platform(True, True)} - Python {platform.python_version()}, deemix {deemixVersion}")
# Only keep last 5 logfiles (to preserve disk space)
logslist = listdir(logspath)
logslist.sort()
if len(logslist)>5:
for i in range(len(logslist)-5):
(logspath / logslist[i]).unlink()
# Saves the settings
def saveSettings(self, newSettings=None, dz=None):
if newSettings:
if dz and newSettings.get('tagsLanguage') != self.settings.get('tagsLanguage'): dz.set_accept_language(newSettings.get('tagsLanguage'))
if newSettings.get('downloadLocation') != self.settings.get('downloadLocation') and not checkFolder(newSettings.get('downloadLocation')):
newSettings['downloadLocation'] = DEFAULT_SETTINGS['downloadLocation']
makedirs(newSettings['downloadLocation'], exist_ok=True)
self.settings = newSettings
with open(self.configFolder / 'config.json', 'w') as configFile:
json.dump(self.settings, configFile, indent=2)
# Checks if the default settings have changed
def settingsCheck(self):
changes = 0
for set in DEFAULT_SETTINGS:
if not set in self.settings or type(self.settings[set]) != type(DEFAULT_SETTINGS[set]):
self.settings[set] = DEFAULT_SETTINGS[set]
changes += 1
for set in DEFAULT_SETTINGS['tags']:
if not set in self.settings['tags'] or type(self.settings['tags'][set]) != type(DEFAULT_SETTINGS['tags'][set]):
self.settings['tags'][set] = DEFAULT_SETTINGS['tags'][set]
changes += 1
if self.settings['downloadLocation'] == "":
self.settings['downloadLocation'] = DEFAULT_SETTINGS['downloadLocation']
changes += 1
for template in ['tracknameTemplate', 'albumTracknameTemplate', 'playlistTracknameTemplate', 'playlistNameTemplate', 'artistNameTemplate', 'albumNameTemplate', 'playlistFilenameTemplate', 'coverImageTemplate', 'artistImageTemplate', 'paddingSize']:
if self.settings[template] == "":
self.settings[template] = DEFAULT_SETTINGS[template]
changes += 1
return changes

View File

@ -1,346 +0,0 @@
import json
from pathlib import Path
import spotipy
SpotifyClientCredentials = spotipy.oauth2.SpotifyClientCredentials
from deemix.utils.localpaths import getConfigFolder
from deemix.app.queueitem import QIConvertable
emptyPlaylist = {
'collaborative': False,
'description': "",
'external_urls': {'spotify': None},
'followers': {'total': 0, 'href': None},
'id': None,
'images': [],
'name': "Something went wrong",
'owner': {
'display_name': "Error",
'id': None
},
'public': True,
'tracks' : [],
'type': 'playlist',
'uri': None
}
class SpotifyHelper:
def __init__(self, configFolder=None):
self.credentials = {}
self.spotifyEnabled = False
self.sp = None
self.configFolder = configFolder
# Make sure config folder exists
if not self.configFolder:
self.configFolder = getConfigFolder()
self.configFolder = Path(self.configFolder)
if not self.configFolder.is_dir():
self.configFolder.mkdir()
# Make sure authCredentials exsits
if not (self.configFolder / 'authCredentials.json').is_file():
with open(self.configFolder / 'authCredentials.json', 'w') as f:
json.dump({'clientId': "", 'clientSecret': ""}, f, indent=2)
# Load spotify id and secret and check if they are usable
with open(self.configFolder / 'authCredentials.json', 'r') as credentialsFile:
self.credentials = json.load(credentialsFile)
self.checkCredentials()
self.checkValidCache()
def checkValidCache(self):
if (self.configFolder / 'spotifyCache.json').is_file():
with open(self.configFolder / 'spotifyCache.json', 'r') as spotifyCache:
try:
cache = json.load(spotifyCache)
except Exception as e:
print(str(e))
(self.configFolder / 'spotifyCache.json').unlink()
return
# Remove old versions of cache
if len(cache['tracks'].values()) and isinstance(list(cache['tracks'].values())[0], int) or \
len(cache['albums'].values()) and isinstance(list(cache['albums'].values())[0], int):
(self.configFolder / 'spotifyCache.json').unlink()
def checkCredentials(self):
if self.credentials['clientId'] == "" or self.credentials['clientSecret'] == "":
spotifyEnabled = False
else:
try:
client_credentials_manager = SpotifyClientCredentials(client_id=self.credentials['clientId'],
client_secret=self.credentials['clientSecret'])
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
self.sp.user_playlists('spotify')
self.spotifyEnabled = True
except Exception as e:
self.spotifyEnabled = False
return self.spotifyEnabled
def getCredentials(self):
return self.credentials
def setCredentials(self, spotifyCredentials):
# Remove extra spaces, just to be sure
spotifyCredentials['clientId'] = spotifyCredentials['clientId'].strip()
spotifyCredentials['clientSecret'] = spotifyCredentials['clientSecret'].strip()
# Save them to disk
with open(self.configFolder / 'authCredentials.json', 'w') as f:
json.dump(spotifyCredentials, f, indent=2)
# Check if they are usable
self.credentials = spotifyCredentials
self.checkCredentials()
# Converts spotify API playlist structure to deezer's playlist structure
def _convert_playlist_structure(self, spotify_obj):
if len(spotify_obj['images']):
url = spotify_obj['images'][0]['url']
else:
url = False
deezer_obj = {
'checksum': spotify_obj['snapshot_id'],
'collaborative': spotify_obj['collaborative'],
'creation_date': "XXXX-00-00",
'creator': {
'id': spotify_obj['owner']['id'],
'name': spotify_obj['owner']['display_name'],
'tracklist': spotify_obj['owner']['href'],
'type': "user"
},
'description': spotify_obj['description'],
'duration': 0,
'fans': spotify_obj['followers']['total'] if 'followers' in spotify_obj else 0,
'id': spotify_obj['id'],
'is_loved_track': False,
'link': spotify_obj['external_urls']['spotify'],
'nb_tracks': spotify_obj['tracks']['total'],
'picture': url,
'picture_small': url,
'picture_medium': url,
'picture_big': url,
'picture_xl': url,
'public': spotify_obj['public'],
'share': spotify_obj['external_urls']['spotify'],
'title': spotify_obj['name'],
'tracklist': spotify_obj['tracks']['href'],
'type': "playlist"
}
if not url:
deezer_obj['picture_small'] = "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/56x56-000000-80-0-0.jpg"
deezer_obj['picture_medium'] = "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/250x250-000000-80-0-0.jpg"
deezer_obj['picture_big'] = "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/500x500-000000-80-0-0.jpg"
deezer_obj['picture_xl'] = "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/1000x1000-000000-80-0-0.jpg"
return deezer_obj
# Returns deezer song_id from spotify track_id or track dict
def get_trackid_spotify(self, dz, track_id, fallbackSearch, spotifyTrack=None):
if not self.spotifyEnabled:
raise spotifyFeaturesNotEnabled
singleTrack = False
if not spotifyTrack:
if (self.configFolder / 'spotifyCache.json').is_file():
with open(self.configFolder / 'spotifyCache.json', 'r') as spotifyCache:
cache = json.load(spotifyCache)
else:
cache = {'tracks': {}, 'albums': {}}
if str(track_id) in cache['tracks']:
dz_track = None
if cache['tracks'][str(track_id)]['isrc']:
dz_track = dz.api.get_track_by_ISRC(cache['tracks'][str(track_id)]['isrc'])
dz_id = dz_track['id'] if 'id' in dz_track and 'title' in dz_track else "0"
cache['tracks'][str(track_id)]['id'] = dz_id
return (cache['tracks'][str(track_id)]['id'], dz_track, cache['tracks'][str(track_id)]['isrc'])
singleTrack = True
spotify_track = self.sp.track(track_id)
else:
spotify_track = spotifyTrack
dz_id = "0"
dz_track = None
isrc = None
if 'external_ids' in spotify_track and 'isrc' in spotify_track['external_ids']:
try:
dz_track = dz.api.get_track_by_ISRC(spotify_track['external_ids']['isrc'])
dz_id = dz_track['id'] if 'id' in dz_track and 'title' in dz_track else "0"
isrc = spotify_track['external_ids']['isrc']
except:
dz_id = dz.api.get_track_id_from_metadata(
artist=spotify_track['artists'][0]['name'],
track=spotify_track['name'],
album=spotify_track['album']['name']
) if fallbackSearch else "0"
elif fallbackSearch:
dz_id = dz.api.get_track_id_from_metadata(
artist=spotify_track['artists'][0]['name'],
track=spotify_track['name'],
album=spotify_track['album']['name']
)
if singleTrack:
cache['tracks'][str(track_id)] = {'id': dz_id, 'isrc': isrc}
with open(self.configFolder / 'spotifyCache.json', 'w') as spotifyCache:
json.dump(cache, spotifyCache)
return (dz_id, dz_track, isrc)
# Returns deezer album_id from spotify album_id
def get_albumid_spotify(self, dz, album_id):
if not self.spotifyEnabled:
raise spotifyFeaturesNotEnabled
if (self.configFolder / 'spotifyCache.json').is_file():
with open(self.configFolder / 'spotifyCache.json', 'r') as spotifyCache:
cache = json.load(spotifyCache)
else:
cache = {'tracks': {}, 'albums': {}}
if str(album_id) in cache['albums']:
return cache['albums'][str(album_id)]['id']
spotify_album = self.sp.album(album_id)
dz_album = "0"
upc = None
if 'external_ids' in spotify_album and 'upc' in spotify_album['external_ids']:
try:
dz_album = dz.api.get_album_by_UPC(spotify_album['external_ids']['upc'])
dz_album = dz_album['id'] if 'id' in dz_album else "0"
upc = spotify_album['external_ids']['upc']
except:
try:
dz_album = dz.api.get_album_by_UPC(int(spotify_album['external_ids']['upc']))
dz_album = dz_album['id'] if 'id' in dz_album else "0"
except:
dz_album = "0"
cache['albums'][str(album_id)] = {'id': dz_album, 'upc': upc}
with open(self.configFolder / 'spotifyCache.json', 'w') as spotifyCache:
json.dump(cache, spotifyCache)
return dz_album
def generate_playlist_queueitem(self, dz, playlist_id, bitrate, settings):
if not self.spotifyEnabled:
raise spotifyFeaturesNotEnabled
spotify_playlist = self.sp.playlist(playlist_id)
if len(spotify_playlist['images']):
cover = spotify_playlist['images'][0]['url']
else:
cover = "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/75x75-000000-80-0-0.jpg"
playlistAPI = self._convert_playlist_structure(spotify_playlist)
playlistAPI['various_artist'] = dz.api.get_artist(5080)
extra = {}
extra['unconverted'] = []
tracklistTmp = spotify_playlist['tracks']['items']
while spotify_playlist['tracks']['next']:
spotify_playlist['tracks'] = self.sp.next(spotify_playlist['tracks'])
tracklistTmp += spotify_playlist['tracks']['items']
for item in tracklistTmp:
if item['track']:
if item['track']['explicit']:
playlistAPI['explicit'] = True
extra['unconverted'].append(item['track'])
totalSize = len(extra['unconverted'])
if not 'explicit' in playlistAPI:
playlistAPI['explicit'] = False
extra['playlistAPI'] = playlistAPI
return QIConvertable(
playlist_id,
bitrate,
spotify_playlist['name'],
spotify_playlist['owner']['display_name'],
cover,
playlistAPI['explicit'],
totalSize,
'spotify_playlist',
settings,
extra,
)
def convert_spotify_playlist(self, dz, queueItem, interface=None):
convertPercentage = 0
lastPercentage = 0
if (self.configFolder / 'spotifyCache.json').is_file():
with open(self.configFolder / 'spotifyCache.json', 'r') as spotifyCache:
cache = json.load(spotifyCache)
else:
cache = {'tracks': {}, 'albums': {}}
if interface:
interface.send("startConversion", queueItem.uuid)
collection = []
for pos, track in enumerate(queueItem.extra['unconverted'], start=1):
if queueItem.cancel:
return
if str(track['id']) in cache['tracks']:
trackID = cache['tracks'][str(track['id'])]['id']
trackAPI = None
if cache['tracks'][str(track['id'])]['isrc']:
trackAPI = dz.api.get_track_by_ISRC(cache['tracks'][str(track['id'])]['isrc'])
else:
(trackID, trackAPI, isrc) = self.get_trackid_spotify(dz, "0", queueItem.settings['fallbackSearch'], track)
cache['tracks'][str(track['id'])] = {
'id': trackID,
'isrc': isrc
}
if str(trackID) == "0":
deezerTrack = {
'SNG_ID': "0",
'SNG_TITLE': track['name'],
'DURATION': 0,
'MD5_ORIGIN': 0,
'MEDIA_VERSION': 0,
'FILESIZE': 0,
'ALB_TITLE': track['album']['name'],
'ALB_PICTURE': "",
'ART_ID': 0,
'ART_NAME': track['artists'][0]['name']
}
else:
deezerTrack = dz.gw.get_track_with_fallback(trackID)
deezerTrack['_EXTRA_PLAYLIST'] = queueItem.extra['playlistAPI']
if trackAPI:
deezerTrack['_EXTRA_TRACK'] = trackAPI
deezerTrack['POSITION'] = pos
deezerTrack['SIZE'] = queueItem.size
deezerTrack['FILENAME_TEMPLATE'] = queueItem.settings['playlistTracknameTemplate']
collection.append(deezerTrack)
convertPercentage = (pos / queueItem.size) * 100
if round(convertPercentage) != lastPercentage and round(convertPercentage) % 5 == 0:
lastPercentage = round(convertPercentage)
if interface:
interface.send("updateQueue", {'uuid': queueItem.uuid, 'conversion': lastPercentage})
queueItem.extra = None
queueItem.collection = collection
with open(self.configFolder / 'spotifyCache.json', 'w') as spotifyCache:
json.dump(cache, spotifyCache)
def get_user_playlists(self, user):
if not self.spotifyEnabled:
raise spotifyFeaturesNotEnabled
result = []
playlists = self.sp.user_playlists(user)
while playlists:
for playlist in playlists['items']:
result.append(self._convert_playlist_structure(playlist))
if playlists['next']:
playlists = self.sp.next(playlists)
else:
playlists = None
return result
def get_playlist_tracklist(self, id):
if not self.spotifyEnabled:
raise spotifyFeaturesNotEnabled
playlist = self.sp.playlist(id)
tracklist = playlist['tracks']['items']
while playlist['tracks']['next']:
playlist['tracks'] = self.sp.next(playlist['tracks'])
tracklist += playlist['tracks']['items']
playlist['tracks'] = tracklist
return playlist
class spotifyFeaturesNotEnabled(Exception):
pass

View File

@ -8,24 +8,35 @@ def _md5(data):
return h.hexdigest() return h.hexdigest()
def generateBlowfishKey(trackId): def generateBlowfishKey(trackId):
SECRET = 'g4el58wc' + '0zvf9na1' SECRET = 'g4el58wc0zvf9na1'
idMd5 = _md5(trackId) idMd5 = _md5(trackId)
bfKey = "" bfKey = ""
for i in range(16): for i in range(16):
bfKey += chr(ord(idMd5[i]) ^ ord(idMd5[i + 16]) ^ ord(SECRET[i])) bfKey += chr(ord(idMd5[i]) ^ ord(idMd5[i + 16]) ^ ord(SECRET[i]))
return bfKey return bfKey
def generateStreamURL(sng_id, md5, media_version, format): def generateStreamPath(sng_id, md5, media_version, format):
urlPart = b'\xa4'.join( urlPart = b'\xa4'.join(
[str.encode(md5), str.encode(str(format)), str.encode(str(sng_id)), str.encode(str(media_version))]) [str.encode(md5), str.encode(str(format)), str.encode(str(sng_id)), str.encode(str(media_version))])
md5val = _md5(urlPart) md5val = _md5(urlPart)
step2 = str.encode(md5val) + b'\xa4' + urlPart + b'\xa4' step2 = str.encode(md5val) + b'\xa4' + urlPart + b'\xa4'
step2 = step2 + (b'.' * (16 - (len(step2) % 16))) step2 = step2 + (b'.' * (16 - (len(step2) % 16)))
urlPart = binascii.hexlify(AES.new(b'jo6aey6haid2Teih', AES.MODE_ECB).encrypt(step2)) urlPart = binascii.hexlify(AES.new(b'jo6aey6haid2Teih', AES.MODE_ECB).encrypt(step2))
return "https://e-cdns-proxy-" + md5[0] + ".dzcdn.net/mobile/1/" + urlPart.decode("utf-8") return urlPart.decode("utf-8")
def reverseStreamURL(url): def reverseStreamPath(urlPart):
urlPart = url[42:]
step2 = AES.new(b'jo6aey6haid2Teih', AES.MODE_ECB).decrypt(binascii.unhexlify(urlPart.encode("utf-8"))) step2 = AES.new(b'jo6aey6haid2Teih', AES.MODE_ECB).decrypt(binascii.unhexlify(urlPart.encode("utf-8")))
(md5val, md5, format, sng_id, media_version, _) = step2.split(b'\xa4') (md5val, md5, format, sng_id, media_version, _) = step2.split(b'\xa4')
return (sng_id.decode('utf-8'), md5.decode('utf-8'), media_version.decode('utf-8'), format.decode('utf-8')) return (sng_id.decode('utf-8'), md5.decode('utf-8'), media_version.decode('utf-8'), format.decode('utf-8'))
def generateStreamURL(sng_id, md5, media_version, format):
urlPart = generateStreamPath(sng_id, md5, media_version, format)
return "https://e-cdns-proxy-" + md5[0] + ".dzcdn.net/mobile/1/" + urlPart
def generateUnencryptedStreamURL(sng_id, md5, media_version, format):
urlPart = generateStreamPath(sng_id, md5, media_version, format)
return "https://e-cdns-proxy-" + md5[0] + ".dzcdn.net/api/1/" + urlPart
def reverseStreamURL(url):
urlPart = url[url.find("/1/")+3:]
return generateStreamPath(urlPart)

File diff suppressed because it is too large Load Diff

246
deemix/itemgen.py Normal file
View File

@ -0,0 +1,246 @@
from deemix.types.DownloadObjects import Single, Collection
class GenerationError(Exception):
def __init__(self, link, message, errid=None):
self.link = link
self.message = message
self.errid = errid
def toDict(self):
return {
'link': self.link,
'error': self.message,
'errid': self.errid
}
def generateTrackItem(dz, id, bitrate, trackAPI=None, albumAPI=None):
# Check if is an isrc: url
if str(id).startswith("isrc"):
try:
trackAPI = dz.api.get_track(id)
except APIError as e:
e = str(e)
raise GenerationError("https://deezer.com/track/"+str(id), f"Wrong URL: {e}")
if 'id' in trackAPI and 'title' in trackAPI:
id = trackAPI['id']
else:
raise GenerationError("https://deezer.com/track/"+str(id), "Track ISRC is not available on deezer", "ISRCnotOnDeezer")
# Get essential track info
try:
trackAPI_gw = dz.gw.get_track_with_fallback(id)
except gwAPIError as e:
e = str(e)
message = "Wrong URL"
if "DATA_ERROR" in e: message += f": {e['DATA_ERROR']}"
raise GenerationError("https://deezer.com/track/"+str(id), message)
title = trackAPI_gw['SNG_TITLE'].strip()
if trackAPI_gw.get('VERSION') and trackAPI_gw['VERSION'] not in trackAPI_gw['SNG_TITLE']:
title += f" {trackAPI_gw['VERSION']}".strip()
explicit = bool(int(trackAPI_gw.get('EXPLICIT_LYRICS', 0)))
return Single(
'track',
id,
bitrate,
title,
trackAPI_gw['ART_NAME'],
f"https://e-cdns-images.dzcdn.net/images/cover/{trackAPI_gw['ALB_PICTURE']}/75x75-000000-80-0-0.jpg",
explicit,
trackAPI_gw,
trackAPI,
albumAPI
)
def generateAlbumItem(dz, id, bitrate, rootArtist=None):
# Get essential album info
try:
albumAPI = dz.api.get_album(id)
except APIError as e:
e = str(e)
raise GenerationError("https://deezer.com/album/"+str(id), f"Wrong URL: {e}")
if str(id).startswith('upc'): id = albumAPI['id']
# Get extra info about album
# This saves extra api calls when downloading
albumAPI_gw = dz.gw.get_album(id)
albumAPI['nb_disk'] = albumAPI_gw['NUMBER_DISK']
albumAPI['copyright'] = albumAPI_gw['COPYRIGHT']
albumAPI['root_artist'] = rootArtist
# If the album is a single download as a track
if albumAPI['nb_tracks'] == 1:
return generateTrackItem(dz, albumAPI['tracks']['data'][0]['id'], bitrate, albumAPI=albumAPI)
tracksArray = dz.gw.get_album_tracks(id)
if albumAPI['cover_small'] != None:
cover = albumAPI['cover_small'][:-24] + '/75x75-000000-80-0-0.jpg'
else:
cover = f"https://e-cdns-images.dzcdn.net/images/cover/{albumAPI_gw['ALB_PICTURE']}/75x75-000000-80-0-0.jpg"
totalSize = len(tracksArray)
albumAPI['nb_tracks'] = totalSize
collection = []
for pos, trackAPI in enumerate(tracksArray, start=1):
trackAPI['POSITION'] = pos
trackAPI['SIZE'] = totalSize
collection.append(trackAPI)
explicit = albumAPI_gw.get('EXPLICIT_ALBUM_CONTENT', {}).get('EXPLICIT_LYRICS_STATUS', LyricsStatus.UNKNOWN) in [LyricsStatus.EXPLICIT, LyricsStatus.PARTIALLY_EXPLICIT]
return Collection(
'album',
id,
bitrate,
albumAPI['title'],
albumAPI['artist']['name'],
cover,
explicit,
totalSize,
tracks_gw=collection,
albumAPI=albumAPI
)
def generatePlaylistItem(dz, id, bitrate, playlistAPI=None, playlistTracksAPI=None):
if not playlistAPI:
# Get essential playlist info
try:
playlistAPI = dz.api.get_playlist(id)
except:
playlistAPI = None
# Fallback to gw api if the playlist is private
if not playlistAPI:
try:
userPlaylist = dz.gw.get_playlist_page(id)
playlistAPI = map_user_playlist(userPlaylist['DATA'])
except gwAPIError as e:
e = str(e)
message = "Wrong URL"
if "DATA_ERROR" in e:
message += f": {e['DATA_ERROR']}"
raise GenerationError("https://deezer.com/playlist/"+str(id), message)
# Check if private playlist and owner
if not playlistAPI.get('public', False) and playlistAPI['creator']['id'] != str(dz.current_user['id']):
logger.warning("You can't download others private playlists.")
raise GenerationError("https://deezer.com/playlist/"+str(id), "You can't download others private playlists.", "notYourPrivatePlaylist")
if not playlistTracksAPI:
playlistTracksAPI = dz.gw.get_playlist_tracks(id)
playlistAPI['various_artist'] = dz.api.get_artist(5080) # Useful for save as compilation
totalSize = len(playlistTracksAPI)
playlistAPI['nb_tracks'] = totalSize
collection = []
for pos, trackAPI in enumerate(playlistTracksAPI, start=1):
if trackAPI.get('EXPLICIT_TRACK_CONTENT', {}).get('EXPLICIT_LYRICS_STATUS', LyricsStatus.UNKNOWN) in [LyricsStatus.EXPLICIT, LyricsStatus.PARTIALLY_EXPLICIT]:
playlistAPI['explicit'] = True
trackAPI['POSITION'] = pos
trackAPI['SIZE'] = totalSize
collection.append(trackAPI)
if not 'explicit' in playlistAPI: playlistAPI['explicit'] = False
return Collection(
'playlist',
id,
bitrate,
playlistAPI['title'],
playlistAPI['creator']['name'],
playlistAPI['picture_small'][:-24] + '/75x75-000000-80-0-0.jpg',
playlistAPI['explicit'],
totalSize,
tracks_gw=collection,
playlistAPI=playlistAPI
)
def generateArtistItem(dz, id, bitrate, interface=None):
# Get essential artist info
try:
artistAPI = dz.api.get_artist(id)
except APIError as e:
e = str(e)
raise GenerationError("https://deezer.com/artist/"+str(id), f"Wrong URL: {e}")
if interface: interface.send("startAddingArtist", {'name': artistAPI['name'], 'id': artistAPI['id']})
rootArtist = {
'id': artistAPI['id'],
'name': artistAPI['name']
}
artistDiscographyAPI = dz.gw.get_artist_discography_tabs(id, 100)
allReleases = artistDiscographyAPI.pop('all', [])
albumList = []
for album in allReleases:
albumList.append(generateAlbumItem(dz, album['id'], bitrate, rootArtist=rootArtist))
if interface: interface.send("finishAddingArtist", {'name': artistAPI['name'], 'id': artistAPI['id']})
return albumList
def generateArtistDiscographyItem(dz, id, bitrate, interface=None):
# Get essential artist info
try:
artistAPI = dz.api.get_artist(id)
except APIError as e:
e = str(e)
raise GenerationError("https://deezer.com/artist/"+str(id)+"/discography", f"Wrong URL: {e}")
if interface: interface.send("startAddingArtist", {'name': artistAPI['name'], 'id': artistAPI['id']})
rootArtist = {
'id': artistAPI['id'],
'name': artistAPI['name']
}
artistDiscographyAPI = dz.gw.get_artist_discography_tabs(id, 100)
artistDiscographyAPI.pop('all', None) # all contains albums and singles, so its all duplicates. This removes them
albumList = []
for type in artistDiscographyAPI:
for album in artistDiscographyAPI[type]:
albumList.append(generateAlbumItem(dz, album['id'], bitrate, rootArtist=rootArtist))
if interface: interface.send("finishAddingArtist", {'name': artistAPI['name'], 'id': artistAPI['id']})
return albumList
def generateArtistTopItem(dz, id, bitrate, interface=None):
# Get essential artist info
try:
artistAPI = dz.api.get_artist(id)
except APIError as e:
e = str(e)
raise GenerationError("https://deezer.com/artist/"+str(id)+"/top_track", f"Wrong URL: {e}")
# Emulate the creation of a playlist
# Can't use generatePlaylistItem directly as this is not a real playlist
playlistAPI = {
'id': str(artistAPI['id'])+"_top_track",
'title': artistAPI['name']+" - Top Tracks",
'description': "Top Tracks for "+artistAPI['name'],
'duration': 0,
'public': True,
'is_loved_track': False,
'collaborative': False,
'nb_tracks': 0,
'fans': artistAPI['nb_fan'],
'link': "https://www.deezer.com/artist/"+str(artistAPI['id'])+"/top_track",
'share': None,
'picture': artistAPI['picture'],
'picture_small': artistAPI['picture_small'],
'picture_medium': artistAPI['picture_medium'],
'picture_big': artistAPI['picture_big'],
'picture_xl': artistAPI['picture_xl'],
'checksum': None,
'tracklist': "https://api.deezer.com/artist/"+str(artistAPI['id'])+"/top",
'creation_date': "XXXX-00-00",
'creator': {
'id': "art_"+str(artistAPI['id']),
'name': artistAPI['name'],
'type': "user"
},
'type': "playlist"
}
artistTopTracksAPI_gw = dz.gw.get_artist_toptracks(id)
return generatePlaylistItem(dz, playlistAPI['id'], bitrate, playlistAPI=playlistAPI, playlistTracksAPI=artistTopTracksAPI_gw)

View File

139
deemix/settings.py Normal file
View File

@ -0,0 +1,139 @@
import json
from pathlib import Path
from os import makedirs
from deezer import TrackFormats
import deemix.utils.localpaths as localpaths
"""Should the lib overwrite files?"""
class OverwriteOption():
OVERWRITE = 'y' # Yes, overwrite the file
DONT_OVERWRITE = 'n' # No, don't overwrite the file
DONT_CHECK_EXT = 'e' # No, and don't check for extensions
KEEP_BOTH = 'b' # No, and keep both files
ONLY_TAGS = 't' # Overwrite only the tags
"""What should I do with featured artists?"""
class FeaturesOption():
NO_CHANGE = "0" # Do nothing
REMOVE_TITLE = "1" # Remove from track title
REMOVE_TITLE_ALBUM = "3" # Remove from track title and album title
MOVE_TITLE = "2" # Move to track title
DEFAULTS = {
"downloadLocation": "",
"tracknameTemplate": "%artist% - %title%",
"albumTracknameTemplate": "%tracknumber% - %title%",
"playlistTracknameTemplate": "%position% - %artist% - %title%",
"createPlaylistFolder": True,
"playlistNameTemplate": "%playlist%",
"createArtistFolder": False,
"artistNameTemplate": "%artist%",
"createAlbumFolder": True,
"albumNameTemplate": "%artist% - %album%",
"createCDFolder": True,
"createStructurePlaylist": False,
"createSingleFolder": False,
"padTracks": True,
"paddingSize": "0",
"illegalCharacterReplacer": "_",
"queueConcurrency": 3,
"maxBitrate": str(TrackFormats.MP3_320),
"fallbackBitrate": True,
"fallbackSearch": False,
"logErrors": True,
"logSearched": False,
"saveDownloadQueue": False,
"overwriteFile": OverwriteOption.DONT_OVERWRITE,
"createM3U8File": False,
"playlistFilenameTemplate": "playlist",
"syncedLyrics": False,
"embeddedArtworkSize": 800,
"embeddedArtworkPNG": False,
"localArtworkSize": 1400,
"localArtworkFormat": "jpg",
"saveArtwork": True,
"coverImageTemplate": "cover",
"saveArtworkArtist": False,
"artistImageTemplate": "folder",
"jpegImageQuality": 80,
"dateFormat": "Y-M-D",
"albumVariousArtists": True,
"removeAlbumVersion": False,
"removeDuplicateArtists": False,
"tagsLanguage": "",
"featuredToTitle": FeaturesOption.NO_CHANGE,
"titleCasing": "nothing",
"artistCasing": "nothing",
"executeCommand": "",
"tags": {
"title": True,
"artist": True,
"album": True,
"cover": True,
"trackNumber": True,
"trackTotal": False,
"discNumber": True,
"discTotal": False,
"albumArtist": True,
"genre": True,
"year": True,
"date": True,
"explicit": False,
"isrc": True,
"length": True,
"barcode": True,
"bpm": True,
"replayGain": False,
"label": True,
"lyrics": False,
"syncedLyrics": False,
"copyright": False,
"composer": False,
"involvedPeople": False,
"source": False,
"savePlaylistAsCompilation": False,
"useNullSeparator": False,
"saveID3v1": True,
"multiArtistSeparator": "default",
"singleAlbumArtist": False,
"coverDescriptionUTF8": False
}
}
def saveSettings(settings, configFolder=None):
configFolder = Path(configFolder or localpaths.getConfigFolder())
makedirs(configFolder, exist_ok=True) # Create config folder if it doesn't exsist
with open(configFolder / 'config.json', 'w') as configFile:
json.dump(settings, configFile, indent=2)
def loadSettings(configFolder=None):
configFolder = Path(configFolder or localpaths.getConfigFolder())
makedirs(configFolder, exist_ok=True) # Create config folder if it doesn't exsist
if not (configFolder / 'config.json').is_file(): saveSettings(DEFAULTS, configFolder) # Create config file if it doesn't exsist
# Read config file
with open(configFolder / 'config.json', 'r') as configFile:
settings = json.load(configFile)
if checkSettings(settings) > 0: saveSettings(settings) # Check the settings and save them if something changed
return settings
def checkSettings(settings):
changes = 0
for set in DEFAULTS:
if not set in settings or type(settings[set]) != type(DEFAULTS[set]):
settings[set] = DEFAULTS[set]
changes += 1
for set in DEFAULTS['tags']:
if not set in settings['tags'] or type(settings['tags'][set]) != type(DEFAULTS['tags'][set]):
settings['tags'][set] = DEFAULTS['tags'][set]
changes += 1
if settings['downloadLocation'] == "":
settings['downloadLocation'] = DEFAULTS['downloadLocation']
changes += 1
for template in ['tracknameTemplate', 'albumTracknameTemplate', 'playlistTracknameTemplate', 'playlistNameTemplate', 'artistNameTemplate', 'albumNameTemplate', 'playlistFilenameTemplate', 'coverImageTemplate', 'artistImageTemplate', 'paddingSize']:
if settings[template] == "":
settings[template] = DEFAULTS[template]
changes += 1
return changes

View File

@ -4,7 +4,7 @@ from deemix.utils import removeDuplicateArtists, removeFeatures
from deemix.types.Artist import Artist from deemix.types.Artist import Artist
from deemix.types.Date import Date from deemix.types.Date import Date
from deemix.types.Picture import Picture from deemix.types.Picture import Picture
from deemix import VARIOUS_ARTISTS from deemix.types import VARIOUS_ARTISTS
class Album: class Album:
def __init__(self, id="0", title="", pic_md5=""): def __init__(self, id="0", title="", pic_md5=""):

View File

@ -1,5 +1,5 @@
from deemix.types.Picture import Picture from deemix.types.Picture import Picture
from deemix import VARIOUS_ARTISTS from deemix.types import VARIOUS_ARTISTS
class Artist: class Artist:
def __init__(self, id="0", name="", role="", pic_md5=""): def __init__(self, id="0", name="", role="", pic_md5=""):

View File

@ -0,0 +1,126 @@
class IDownloadObject:
def __init__(self, type=None, id=None, bitrate=None, title=None, artist=None, cover=None, explicit=False, size=None, dictItem=None):
if dictItem:
self.type = dictItem['type']
self.id = dictItem['id']
self.bitrate = dictItem['bitrate']
self.title = dictItem['title']
self.artist = dictItem['artist']
self.cover = dictItem['cover']
self.explicit = dictItem.get('explicit', False)
self.size = dictItem['size']
self.downloaded = dictItem['downloaded']
self.failed = dictItem['failed']
self.progress = dictItem['progress']
self.errors = dictItem['errors']
self.files = dictItem['files']
else:
self.type = type
self.id = id
self.bitrate = bitrate
self.title = title
self.artist = artist
self.cover = cover
self.explicit = explicit
self.size = size
self.downloaded = 0
self.failed = 0
self.progress = 0
self.errors = []
self.files = []
self.uuid = f"{self.type}_{self.id}_{self.bitrate}"
self.ack = None
self.__type__ = None
def toDict(self):
return {
'type': self.type,
'id': self.id,
'bitrate': self.bitrate,
'uuid': self.uuid,
'title': self.title,
'artist': self.artist,
'cover': self.cover,
'explicit': self.explicit,
'size': self.size,
'downloaded': self.downloaded,
'failed': self.failed,
'progress': self.progress,
'errors': self.errors,
'files': self.files,
'ack': self.ack,
'__type__': self.__type__
}
def getResettedDict(self):
item = self.toDict()
item['downloaded'] = 0
item['failed'] = 0
item['progress'] = 0
item['errors'] = []
item['files'] = []
return item
def getSlimmedDict(self):
light = self.toDict()
propertiesToDelete = ['single', 'collection', 'convertable']
for property in propertiesToDelete:
if property in light:
del light[property]
return light
class Single(IDownloadObject):
def __init__(self, type=None, id=None, bitrate=None, title=None, artist=None, cover=None, explicit=False, trackAPI_gw=None, trackAPI=None, albumAPI=None, dictItem=None):
if dictItem:
super().__init__(dictItem=dictItem)
self.single = dictItem['single']
else:
super().__init__(type, id, bitrate, title, artist, cover, explicit, 1)
self.single = {
'trackAPI_gw': trackAPI_gw,
'trackAPI': trackAPI,
'albumAPI': albumAPI
}
self.__type__ = "Single"
def toDict(self):
item = super().toDict()
item['single'] = self.single
return item
class Collection(IDownloadObject):
def __init__(self, type=None, id=None, bitrate=None, title=None, artist=None, cover=None, explicit=False, size=None, tracks_gw=None, albumAPI=None, playlistAPI=None, dictItem=None):
if dictItem:
super().__init__(dictItem=dictItem)
self.collection = dictItem['collection']
else:
super().__init__(type, id, bitrate, title, artist, cover, explicit, size)
self.collection = {
'tracks_gw': tracks_gw,
'albumAPI': albumAPI,
'playlistAPI': playlistAPI
}
self.__type__ = "Collection"
def toDict(self):
item = super().toDict()
item['collection'] = self.collection
return item
class Convertable(Collection):
def __init__(self, type=None, id=None, bitrate=None, title=None, artist=None, cover=None, explicit=False, size=None, plugin=None, conversion_data=None, dictItem=None):
if dictItem:
super().__init__(dictItem=dictItem)
self.plugin = dictItem['plugin']
self.conversion_data = dictItem['conversion_data']
else:
super().__init__(type, id, bitrate, title, artist, cover, explicit, size)
self.plugin = plugin
self.conversion_data = conversion_data
self.__type__ = "Convertable"
def toDict(self):
item = super().toDict()
item['plugin'] = self.plugin
item['conversion_data'] = self.conversion_data
return item

View File

@ -14,7 +14,7 @@ from deemix.types.Date import Date
from deemix.types.Picture import Picture from deemix.types.Picture import Picture
from deemix.types.Playlist import Playlist from deemix.types.Playlist import Playlist
from deemix.types.Lyrics import Lyrics from deemix.types.Lyrics import Lyrics
from deemix import VARIOUS_ARTISTS from deemix.types import VARIOUS_ARTISTS
class Track: class Track:
def __init__(self, id="0", name=""): def __init__(self, id="0", name=""):
@ -259,6 +259,91 @@ class Track:
if 'Featured' in self.artist: if 'Featured' in self.artist:
self.featArtistsString = "feat. "+andCommaConcat(self.artist['Featured']) self.featArtistsString = "feat. "+andCommaConcat(self.artist['Featured'])
def applySettings(self, settings, TEMPDIR, embeddedImageFormat):
from deemix.settings import FeaturesOption
# Check if should save the playlist as a compilation
if self.playlist and settings['tags']['savePlaylistAsCompilation']:
self.trackNumber = self.position
self.discNumber = "1"
self.album.makePlaylistCompilation(self.playlist)
self.album.embeddedCoverURL = self.playlist.pic.generatePictureURL(settings['embeddedArtworkSize'], embeddedImageFormat)
ext = self.album.embeddedCoverURL[-4:]
if ext[0] != ".": ext = ".jpg" # Check for Spotify images
self.album.embeddedCoverPath = TEMPDIR / f"pl{trackAPI_gw['_EXTRA_PLAYLIST']['id']}_{settings['embeddedArtworkSize']}{ext}"
else:
if self.album.date: self.date = self.album.date
self.album.embeddedCoverURL = self.album.pic.generatePictureURL(settings['embeddedArtworkSize'], embeddedImageFormat)
ext = self.album.embeddedCoverURL[-4:]
self.album.embeddedCoverPath = TEMPDIR / f"alb{self.album.id}_{settings['embeddedArtworkSize']}{ext}"
self.dateString = self.date.format(settings['dateFormat'])
self.album.dateString = self.album.date.format(settings['dateFormat'])
if self.playlist: self.playlist.dateString = self.playlist.date.format(settings['dateFormat'])
# Check various artist option
if settings['albumVariousArtists'] and self.album.variousArtists:
artist = self.album.variousArtists
isMainArtist = artist.role == "Main"
if artist.name not in self.album.artists:
self.album.artists.insert(0, artist.name)
if isMainArtist or artist.name not in self.album.artist['Main'] and not isMainArtist:
if not artist.role in self.album.artist:
self.album.artist[artist.role] = []
self.album.artist[artist.role].insert(0, artist.name)
self.album.mainArtist.save = not self.album.mainArtist.isVariousArtists() or settings['albumVariousArtists'] and self.album.mainArtist.isVariousArtists()
# Check removeDuplicateArtists
if settings['removeDuplicateArtists']: self.removeDuplicateArtists()
# Check if user wants the feat in the title
if str(settings['featuredToTitle']) == FeaturesOption.REMOVE_TITLE:
self.title = self.getCleanTitle()
elif str(settings['featuredToTitle']) == FeaturesOption.MOVE_TITLE:
self.title = self.getFeatTitle()
elif str(settings['featuredToTitle']) == FeaturesOption.REMOVE_TITLE_ALBUM:
self.title = self.getCleanTitle()
self.album.title = self.album.getCleanTitle()
# Remove (Album Version) from tracks that have that
if settings['removeAlbumVersion']:
if "Album Version" in self.title:
self.title = re.sub(r' ?\(Album Version\)', "", self.title).strip()
# Change Title and Artists casing if needed
if settings['titleCasing'] != "nothing":
self.title = changeCase(self.title, settings['titleCasing'])
if settings['artistCasing'] != "nothing":
self.mainArtist.name = changeCase(self.mainArtist.name, settings['artistCasing'])
for i, artist in enumerate(self.artists):
self.artists[i] = changeCase(artist, settings['artistCasing'])
for type in self.artist:
for i, artist in enumerate(self.artist[type]):
self.artist[type][i] = changeCase(artist, settings['artistCasing'])
self.generateMainFeatStrings()
# Generate artist tag
if settings['tags']['multiArtistSeparator'] == "default":
if str(settings['featuredToTitle']) == FeaturesOption.MOVE_TITLE:
self.artistsString = ", ".join(self.artist['Main'])
else:
self.artistsString = ", ".join(self.artists)
elif settings['tags']['multiArtistSeparator'] == "andFeat":
self.artistsString = self.mainArtistsString
if self.featArtistsString and str(settings['featuredToTitle']) != FeaturesOption.MOVE_TITLE:
self.artistsString += " " + self.featArtistsString
else:
separator = settings['tags']['multiArtistSeparator']
if str(settings['featuredToTitle']) == FeaturesOption.MOVE_TITLE:
self.artistsString = separator.join(self.artist['Main'])
else:
self.artistsString = separator.join(self.artists)
class TrackError(Exception): class TrackError(Exception):
"""Base class for exceptions in this module.""" """Base class for exceptions in this module."""
pass pass

View File

@ -1,7 +1 @@
from deemix.types.Date import Date VARIOUS_ARTISTS = "5080"
from deemix.types.Picture import Picture
from deemix.types.Lyrics import Lyrics
from deemix.types.Album import Album
from deemix.types.Artist import Artist
from deemix.types.Playlist import Playlist
from deemix.types.Track import Track

View File

@ -1,4 +1,3 @@
import re
import string import string
from deezer import TrackFormats from deezer import TrackFormats
import os import os
@ -6,7 +5,7 @@ import os
def generateReplayGainString(trackGain): def generateReplayGainString(trackGain):
return "{0:.2f} dB".format((float(trackGain) + 18.4) * -1) return "{0:.2f} dB".format((float(trackGain) + 18.4) * -1)
def getBitrateInt(txt): def getBitrateNumberFromText(txt):
txt = str(txt).lower() txt = str(txt).lower()
if txt in ['flac', 'lossless', '9']: if txt in ['flac', 'lossless', '9']:
return TrackFormats.FLAC return TrackFormats.FLAC
@ -23,7 +22,6 @@ def getBitrateInt(txt):
else: else:
return None return None
def changeCase(str, type): def changeCase(str, type):
if type == "lower": if type == "lower":
return str.lower() return str.lower()
@ -36,7 +34,6 @@ def changeCase(str, type):
else: else:
return str return str
def removeFeatures(title): def removeFeatures(title):
clean = title clean = title
if "(feat." in clean.lower(): if "(feat." in clean.lower():
@ -48,7 +45,6 @@ def removeFeatures(title):
clean = ' '.join(clean.split()) clean = ' '.join(clean.split())
return clean return clean
def andCommaConcat(lst): def andCommaConcat(lst):
tot = len(lst) tot = len(lst)
result = "" result = ""
@ -61,62 +57,6 @@ def andCommaConcat(lst):
result += ", " result += ", "
return result return result
def getIDFromLink(link, type):
if '?' in link:
link = link[:link.find('?')]
if link.endswith("/"):
link = link[:-1]
if link.startswith("http") and 'open.spotify.com/' in link:
if '&' in link: link = link[:link.find('&')]
if type == "spotifyplaylist":
return link[link.find("/playlist/") + 10:]
if type == "spotifytrack":
return link[link.find("/track/") + 7:]
if type == "spotifyalbum":
return link[link.find("/album/") + 7:]
elif link.startswith("spotify:"):
if type == "spotifyplaylist":
return link[link.find("playlist:") + 9:]
if type == "spotifytrack":
return link[link.find("track:") + 6:]
if type == "spotifyalbum":
return link[link.find("album:") + 6:]
elif type == "artisttop":
return re.search(r"\/artist\/(\d+)\/top_track", link)[1]
elif type == "artistdiscography":
return re.search(r"\/artist\/(\d+)\/discography", link)[1]
else:
return link[link.rfind("/") + 1:]
def getTypeFromLink(link):
type = ''
if 'spotify' in link:
type = 'spotify'
if 'playlist' in link:
type += 'playlist'
elif 'track' in link:
type += 'track'
elif 'album' in link:
type += 'album'
elif 'deezer' in link:
if '/track' in link:
type = 'track'
elif '/playlist' in link:
type = 'playlist'
elif '/album' in link:
type = 'album'
elif re.search("\/artist\/(\d+)\/top_track", link):
type = 'artisttop'
elif re.search("\/artist\/(\d+)\/discography", link):
type = 'artistdiscography'
elif '/artist' in link:
type = 'artist'
return type
def uniqueArray(arr): def uniqueArray(arr):
for iPrinc, namePrinc in enumerate(arr): for iPrinc, namePrinc in enumerate(arr):
for iRest, nRest in enumerate(arr): for iRest, nRest in enumerate(arr):

View File

@ -16,12 +16,11 @@ setup(
license="GPL3", license="GPL3",
classifiers=[ classifiers=[
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Development Status :: 4 - Beta",
"Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7",
"Operating System :: OS Independent", "Operating System :: OS Independent",
], ],
python_requires='>=3.6', python_requires='>=3.7',
packages=find_packages(exclude=("tests",)), packages=find_packages(exclude=("tests",)),
include_package_data=True, include_package_data=True,
install_requires=["click", "pycryptodomex", "mutagen", "requests", "spotipy>=2.11.0", "deezer-py"], install_requires=["click", "pycryptodomex", "mutagen", "requests", "spotipy>=2.11.0", "deezer-py"],

View File

@ -1,7 +1,7 @@
#!/usr/bin/env bash #!/usr/bin/env bash
rm -rd build rm -rd build
rm -rd dist rm -rd dist
python -m bump #python -m bump
python -m bump deemix/__init__.py #python -m bump deemix/__init__.py
python3 setup.py sdist bdist_wheel python3 setup.py sdist bdist_wheel
python3 -m twine upload dist/* python3 -m twine upload dist/*