def get_comments(self):
return comments.for_thread(self.threadid)
- @classmethod
- def by_id(cls, songid):
- songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songid = ?", [songid])
- if not songs:
- raise ValueError(f"No song for ID {songid:d}")
-
- return songs[0]
-
- @classmethod
- def by_threadid(cls, threadid):
- songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songs.threadid = ?", [threadid])
- if not songs:
- raise ValueError(f"No song for Thread ID {songid:d}")
-
- return songs[0]
-
- @classmethod
- def get_all_for_userid(cls, userid):
- return cls._from_db("select * from songs inner join users on songs.userid = users.userid where songs.userid = ? order by songs.created desc", [userid])
-
- @classmethod
- def get_all_for_username(cls, username):
- return cls._from_db("select * from songs inner join users on songs.userid = users.userid where users.username = ? order by songs.created desc", [username])
-
- @classmethod
- def get_all_for_username_and_tag(cls, username, tag):
- return cls._from_db(f"select * from song_tags inner join songs on song_tags.songid = songs.songid inner join users on songs.userid = users.userid where (username = ? and tag = ?) order by songs.created desc", [username, tag])
-
- @classmethod
- def get_all_for_tag(cls, tag):
- return cls._from_db(f"select * from song_tags inner join songs on song_tags.songid = songs.songid inner join users on songs.userid = users.userid where (tag = ?) order by songs.created desc", [tag])
-
- @classmethod
- def get_latest(cls, count):
- return cls._from_db("select * from songs inner join users on songs.userid = users.userid order by songs.created desc limit ?", [count])
-
- @classmethod
- def get_random(cls, count):
- # Get random songs + 10 extras so I can filter out my own (I uploaded too many :/)
- songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songid in (select songid from songs order by random() limit ?)", [count + 10])
- random.shuffle(songs)
-
- # Prevent my songs from showing up in the first 10 results
- for i in reversed(range(min(10, len(songs)))):
- if songs[i].username == "cfulljames":
- del songs[i]
-
- # Drop any extra songs (since we asked for 10 extras)
- songs = songs[:count]
-
- return songs
-
- @classmethod
- def get_for_playlist(cls, playlistid):
- return cls._from_db("""\
- select * from playlist_songs
- inner join songs on playlist_songs.songid = songs.songid
- inner join users on songs.userid = users.userid
- where playlistid = ?
- order by playlist_songs.position asc
- """, [playlistid])
-
- @classmethod
- def _from_db(cls, query, args=()):
- songs_data = db.query(query, args)
- tags, collabs = cls._get_info_for_songs(songs_data)
- songs = []
- for sd in songs_data:
- song_tags = [t["tag"] for t in tags[sd["songid"]] if t["tag"]]
- song_collabs = [c["name"] for c in collabs[sd["songid"]] if c["name"]]
- created = datetime.fromisoformat(sd["created"]).astimezone().strftime("%Y-%m-%d")
- has_pfp = users.user_has_pfp(sd["userid"])
- songs.append(cls(sd["songid"], sd["userid"], sd["threadid"], sd["username"], sd["title"], sanitize_user_text(sd["description"]), created, song_tags, song_collabs, has_pfp))
- return songs
-
- @classmethod
- def _get_info_for_songs(cls, songs):
- tags = {}
- collabs = {}
- for song in songs:
- songid = song["songid"]
- tags[songid] = db.query("select (tag) from song_tags where songid = ?", [songid])
- collabs[songid] = db.query("select (name) from song_collaborators where songid = ?", [songid])
- return tags, collabs
+def by_id(songid):
+ songs = _from_db(
+ """
+ select * from songs
+ inner join users on songs.userid = users.userid
+ where songid = ?
+ """,
+ [songid])
+ if not songs:
+ raise ValueError(f"No song for ID {songid:d}")
+
+ return songs[0]
+
+def by_threadid(threadid):
+ songs = _from_db(
+ """
+ select * from songs
+ inner join users on songs.userid = users.userid
+ where songs.threadid = ?
+ """,
+ [threadid])
+ if not songs:
+ raise ValueError(f"No song for Thread ID {songid:d}")
+
+ return songs[0]
+
+def get_all_for_userid(userid):
+ return _from_db(
+ """
+ select * from songs
+ inner join users on songs.userid = users.userid
+ where songs.userid = ?
+ order by songs.created desc
+ """,
+ [userid])
+
+def get_all_for_username(username):
+ return _from_db(
+ """
+ select * from songs
+ inner join users on songs.userid = users.userid
+ where users.username = ?
+ order by songs.created desc
+ """,
+ [username])
+
+def get_all_for_username_and_tag(username, tag):
+ return _from_db(
+ """
+ select * from song_tags
+ inner join songs on song_tags.songid = songs.songid
+ inner join users on songs.userid = users.userid
+ where (username = ? and tag = ?)
+ order by songs.created desc
+ """,
+ [username, tag])
+
+def get_all_for_tag(tag):
+ return _from_db(
+ """
+ select * from song_tags
+ inner join songs on song_tags.songid = songs.songid
+ inner join users on songs.userid = users.userid
+ where (tag = ?)
+ order by songs.created desc
+ """,
+ [tag])
+
+def get_latest(count):
+ return _from_db(
+ """
+ select * from songs
+ inner join users on songs.userid = users.userid
+ order by songs.created desc
+ limit ?
+ """,
+ [count])
+
+def get_random(count):
+ # Get random songs + 10 extras so I can filter out my own
+ # (I uploaded too many :/)
+ songs = _from_db(
+ """
+ select * from songs
+ inner join users on songs.userid = users.userid
+ where songid in (
+ select songid from songs
+ order by random()
+ limit ?
+ )
+ """,
+ [count + 10])
+ random.shuffle(songs)
+
+ # Prevent my songs from showing up in the first 10 results
+ for i in reversed(range(min(10, len(songs)))):
+ if songs[i].username == "cfulljames":
+ del songs[i]
+
+ # Drop any extra songs (since we asked for 10 extras)
+ songs = songs[:count]
+
+ return songs
+
+def get_for_playlist(playlistid):
+ return _from_db(
+ """
+ select * from playlist_songs
+ inner join songs on playlist_songs.songid = songs.songid
+ inner join users on songs.userid = users.userid
+ where playlistid = ?
+ order by playlist_songs.position asc
+ """,
+ [playlistid])
+
+def _from_db(query, args=()):
+ songs_data = db.query(query, args)
+ tags, collabs = _get_info_for_songs(songs_data)
+ songs = []
+ for sd in songs_data:
+ song_tags = [t["tag"] for t in tags[sd["songid"]] if t["tag"]]
+ song_collabs = [c["name"] for c in collabs[sd["songid"]] if c["name"]]
+ created = datetime.fromisoformat(sd["created"]).astimezone().strftime("%Y-%m-%d")
+ has_pfp = users.user_has_pfp(sd["userid"])
+ songs.append(Song(
+ sd["songid"],
+ sd["userid"],
+ sd["threadid"],
+ sd["username"],
+ sd["title"],
+ sanitize_user_text(sd["description"]),
+ created,
+ song_tags,
+ song_collabs,
+ has_pfp
+ ))
+ return songs
+
+def _get_info_for_songs(songs):
+ tags = {}
+ collabs = {}
+ for song in songs:
+ songid = song["songid"]
+ tags[songid] = db.query(
+ """
+ select (tag) from song_tags
+ where songid = ?
+ """,
+ [songid])
+ collabs[songid] = db.query(
+ """
+ select (name) from song_collaborators
+ where songid = ?
+ """,
+ [songid])
+ return tags, collabs
@bp.get("/edit-song")
def edit_song():
songid = int(request.args["songid"])
except ValueError:
# Invalid song id - file not found
- current_app.logger.warning(f"Failed song edit - {session['username']} - invalid song ID {request.args['songid']}")
+ current_app.logger.warning(
+ f"Failed song edit - {session['username']} "
+ f"- invalid song ID {request.args['songid']}")
abort(404)
try:
- song = Song.by_id(songid)
+ song = by_id(songid)
if not song.userid == session["userid"]:
# Can't edit someone else's song - 401 unauthorized
- current_app.logger.warning(f"Failed song edit - {session['username']} - attempted update for unowned song")
+ current_app.logger.warning(
+ f"Failed song edit - {session['username']} "
+ "- attempted update for unowned song")
abort(401)
except ValueError:
# Song doesn't exist - 404 file not found
- current_app.logger.warning(f"Failed song edit - {session['username']} - song doesn't exist ({songid})")
+ current_app.logger.warning(
+ f"Failed song edit - {session['username']} "
+ f"- song doesn't exist ({songid})")
abort(404)
return render_template("edit-song.html", song=song, **song_colors)
current_app.logger.info(f"{username} uploaded/modified a song")
if "songid" in request.args:
# After editing an existing song, go back to song page
- return redirect(f"/song/{userid}/{request.args['songid']}?action=view")
+ return redirect(
+ f"/song/{userid}/{request.args['songid']}?action=view")
else:
# After creating a new song, go back to profile
return redirect(f"/users/{username}")
if passed:
# Move file to permanent location
- filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
+ user_songs_path = datadir.get_user_songs_path(session["userid"])
+ filepath = user_songs_path / (str(song_data["songid"]) + ".mp3")
shutil.move(tmp_file.name, filepath)
else:
error = True
if not error:
# Update songs table
db.query(
- "update songs set title = ?, description = ? where songid = ?",
- [title, description, songid])
+ """
+ update songs set title = ?, description = ?
+ where songid = ?
+ """,
+ [title, description, songid])
# Update song_tags table
db.query("delete from song_tags where songid = ?", [songid])
for tag in tags:
- db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
+ db.query(
+ """
+ insert into song_tags (tag, songid)
+ values (?, ?)
+ """,
+ [tag, songid])
# Update song_collaborators table
db.query("delete from song_collaborators where songid = ?", [songid])
for collab in collaborators:
- db.query("insert into song_collaborators (name, songid) values (?, ?)", [collab, songid])
+ db.query(
+ """
+ insert into song_collaborators (name, songid)
+ values (?, ?)
+ """,
+ [collab, songid])
db.commit()
flash_and_log(f"Successfully updated '{title}'", "success")
return True
else:
# Create comment thread
- threadid = comments.create_thread(comments.ThreadType.SONG, session["userid"])
+ threadid = comments.create_thread(
+ comments.ThreadType.SONG, session["userid"])
+
# Create song
timestamp = datetime.now(timezone.utc).isoformat()
song_data = db.query(
- "insert into songs (userid, title, description, created, threadid) values (?, ?, ?, ?, ?) returning (songid)",
- [session["userid"], title, description, timestamp, threadid], one=True)
- songid = song_data["songid"]
- filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
+ """
+ insert into songs (userid, title, description, created, threadid)
+ values (?, ?, ?, ?, ?)
+ returning (songid)
+ """,
+ [session["userid"], title, description, timestamp, threadid],
+ one=True)
# Move file to permanent location
+ user_songs_path = datadir.get_user_songs_path(session["userid"])
+ filepath = user_songs_path / (str(song_data["songid"]) + ".mp3")
shutil.move(tmp_file.name, filepath)
# Assign tags
+ songid = song_data["songid"]
for tag in tags:
- db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
+ db.query(
+ "insert into song_tags (tag, songid) values (?, ?)",
+ [tag, songid])
# Assign collaborators
for collab in collaborators:
- db.query("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab])
+ db.query(
+ "insert into song_collaborators (songid, name) values (?, ?)",
+ [songid, collab])
db.commit()
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as out_file:
out_file.close()
os.remove(out_file.name)
- result = subprocess.run(["ffmpeg", "-i", tmp_file.name, out_file.name], stdout=subprocess.PIPE)
+ result = subprocess.run(
+ ["ffmpeg", "-i", tmp_file.name, out_file.name],
+ stdout=subprocess.PIPE)
if result.returncode == 0:
# Successfully converted file, overwrite original file
os.replace(out_file.name, tmp_file.name)
@bp.get("/delete-song/<int:songid>")
def delete_song(songid):
-
- song_data = db.query("select * from songs where songid = ?", [songid], one=True)
+ song_data = db.query(
+ "select * from songs where songid = ?", [songid], one=True)
if not song_data:
- current_app.logger.warning(f"Failed song delete - {session['username']} - song doesn't exist")
+ current_app.logger.warning(
+ f"Failed song delete - {session['username']} - song doesn't exist")
abort(404) # Song doesn't exist
# Users can only delete their own songs
if song_data["userid"] != session["userid"]:
- current_app.logger.warning(f"Failed song delete - {session['username']} - user doesn't own song")
+ current_app.logger.warning(
+ f"Failed song delete - {session['username']} - user doesn't own song")
abort(401)
# Delete tags, collaborators
if songpath.exists():
os.remove(songpath)
- current_app.logger.info(f"{session['username']} deleted song: {song_data['title']}")
+ current_app.logger.info(
+ f"{session['username']} deleted song: {song_data['title']}")
flash_and_log(f"Deleted '{song_data['title']}'", "success")
return redirect(f"/users/{session['username']}")
def song(userid, songid):
if request.args.get("action", None) == "view":
try:
- song = Song.by_id(songid)
+ song = by_id(songid)
if song.userid != userid:
abort(404)
except ValueError:
abort(404)
else:
- return send_from_directory(datadir.get_user_songs_path(userid), str(songid) + ".mp3")
+ return send_from_directory(
+ datadir.get_user_songs_path(userid), str(songid) + ".mp3")
@bp.get("/songs")
def view_songs():
page_colors = users.get_user_colors(user)
if tag and user:
- page_songs = Song.get_all_for_username_and_tag(user, tag)
+ page_songs = get_all_for_username_and_tag(user, tag)
elif tag:
- page_songs = Song.get_all_for_tag(tag)
+ page_songs = get_all_for_tag(tag)
elif user:
- page_songs = Song.get_all_for_username(user)
+ page_songs = get_all_for_username(user)
else:
- page_songs = Song.get_random(50)
-
- return render_template("songs-by-tag.html", user=user, tag=tag, songs=page_songs, **page_colors)
+ page_songs = get_random(50)
+
+ return render_template(
+ "songs-by-tag.html",
+ user=user,
+ tag=tag,
+ songs=page_songs,
+ **page_colors)