From: Chris Fulljames Date: Sun, 6 Apr 2025 11:31:28 +0000 (-0400) Subject: Clean up songs.py X-Git-Url: https://littlesong.place/gitweb/?a=commitdiff_plain;h=a1a578a220735ac7f553f0586b892d3212e54cd3;p=littlesongplace.git Clean up songs.py --- diff --git a/src/littlesongplace/__init__.py b/src/littlesongplace/__init__.py index 94b4021..333b1da 100644 --- a/src/littlesongplace/__init__.py +++ b/src/littlesongplace/__init__.py @@ -71,7 +71,7 @@ def index(): titles, weights = zip(*titles) title = random.choices(titles, weights)[0] - page_songs = songs.Song.get_latest(50) + page_songs = songs.get_latest(50) return render_template("index.html", users=all_users, songs=page_songs, page_title=title) @app.get("/activity") @@ -97,7 +97,7 @@ def activity(): for comment in notifications: threadtype = comment["threadtype"] if threadtype == comments.ThreadType.SONG: - song = songs.Song.by_threadid(comment["threadid"]) + song = songs.by_threadid(comment["threadid"]) comment["songid"] = song.songid comment["title"] = song.title comment["content_userid"] = song.userid @@ -313,7 +313,7 @@ def playlists(playlistid): abort(404) # Cannot view other user's private playlist - pretend it doesn't even exist # Get songs - plist_songs = songs.Song.get_for_playlist(playlistid) + plist_songs = songs.get_for_playlist(playlistid) # Get comments plist_comments = comments.for_thread(plist_data["threadid"]) diff --git a/src/littlesongplace/comments.py b/src/littlesongplace/comments.py index 1ddeaf8..f12a857 100644 --- a/src/littlesongplace/comments.py +++ b/src/littlesongplace/comments.py @@ -74,7 +74,7 @@ def comment(): profile = None playlist = None if threadtype == ThreadType.SONG: - song = songs.Song.by_threadid(request.args["threadid"]) + song = songs.by_threadid(request.args["threadid"]) elif threadtype == ThreadType.PROFILE: profile = db.query("select * from users where threadid = ?", [request.args["threadid"]], one=True) elif threadtype == ThreadType.PLAYLIST: diff --git a/src/littlesongplace/profiles.py b/src/littlesongplace/profiles.py index fb8a4ca..2e25f62 100644 --- a/src/littlesongplace/profiles.py +++ b/src/littlesongplace/profiles.py @@ -24,7 +24,7 @@ def users_profile(profile_username): plist_data = db.query("select * from playlists where userid = ? and private = 0 order by updated desc", [profile_userid]) # Get songs for current profile - profile_songs = songs.Song.get_all_for_userid(profile_userid) + profile_songs = songs.get_all_for_userid(profile_userid) # Get comments for current profile profile_comments = comments.for_thread(profile_data["threadid"]) diff --git a/src/littlesongplace/songs.py b/src/littlesongplace/songs.py index 9637742..62be579 100644 --- a/src/littlesongplace/songs.py +++ b/src/littlesongplace/songs.py @@ -35,90 +35,162 @@ class Song: def get_comments(self): return comments.for_thread(self.threadid) - @classmethod - def by_id(cls, songid): - songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songid = ?", [songid]) - if not songs: - raise ValueError(f"No song for ID {songid:d}") - - return songs[0] - - @classmethod - def by_threadid(cls, threadid): - songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songs.threadid = ?", [threadid]) - if not songs: - raise ValueError(f"No song for Thread ID {songid:d}") - - return songs[0] - - @classmethod - def get_all_for_userid(cls, userid): - return cls._from_db("select * from songs inner join users on songs.userid = users.userid where songs.userid = ? order by songs.created desc", [userid]) - - @classmethod - def get_all_for_username(cls, username): - return cls._from_db("select * from songs inner join users on songs.userid = users.userid where users.username = ? order by songs.created desc", [username]) - - @classmethod - def get_all_for_username_and_tag(cls, username, tag): - return cls._from_db(f"select * from song_tags inner join songs on song_tags.songid = songs.songid inner join users on songs.userid = users.userid where (username = ? and tag = ?) order by songs.created desc", [username, tag]) - - @classmethod - def get_all_for_tag(cls, tag): - return cls._from_db(f"select * from song_tags inner join songs on song_tags.songid = songs.songid inner join users on songs.userid = users.userid where (tag = ?) order by songs.created desc", [tag]) - - @classmethod - def get_latest(cls, count): - return cls._from_db("select * from songs inner join users on songs.userid = users.userid order by songs.created desc limit ?", [count]) - - @classmethod - def get_random(cls, count): - # Get random songs + 10 extras so I can filter out my own (I uploaded too many :/) - songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songid in (select songid from songs order by random() limit ?)", [count + 10]) - random.shuffle(songs) - - # Prevent my songs from showing up in the first 10 results - for i in reversed(range(min(10, len(songs)))): - if songs[i].username == "cfulljames": - del songs[i] - - # Drop any extra songs (since we asked for 10 extras) - songs = songs[:count] - - return songs - - @classmethod - def get_for_playlist(cls, playlistid): - return cls._from_db("""\ - select * from playlist_songs - inner join songs on playlist_songs.songid = songs.songid - inner join users on songs.userid = users.userid - where playlistid = ? - order by playlist_songs.position asc - """, [playlistid]) - - @classmethod - def _from_db(cls, query, args=()): - songs_data = db.query(query, args) - tags, collabs = cls._get_info_for_songs(songs_data) - songs = [] - for sd in songs_data: - song_tags = [t["tag"] for t in tags[sd["songid"]] if t["tag"]] - song_collabs = [c["name"] for c in collabs[sd["songid"]] if c["name"]] - created = datetime.fromisoformat(sd["created"]).astimezone().strftime("%Y-%m-%d") - has_pfp = users.user_has_pfp(sd["userid"]) - songs.append(cls(sd["songid"], sd["userid"], sd["threadid"], sd["username"], sd["title"], sanitize_user_text(sd["description"]), created, song_tags, song_collabs, has_pfp)) - return songs - - @classmethod - def _get_info_for_songs(cls, songs): - tags = {} - collabs = {} - for song in songs: - songid = song["songid"] - tags[songid] = db.query("select (tag) from song_tags where songid = ?", [songid]) - collabs[songid] = db.query("select (name) from song_collaborators where songid = ?", [songid]) - return tags, collabs +def by_id(songid): + songs = _from_db( + """ + select * from songs + inner join users on songs.userid = users.userid + where songid = ? + """, + [songid]) + if not songs: + raise ValueError(f"No song for ID {songid:d}") + + return songs[0] + +def by_threadid(threadid): + songs = _from_db( + """ + select * from songs + inner join users on songs.userid = users.userid + where songs.threadid = ? + """, + [threadid]) + if not songs: + raise ValueError(f"No song for Thread ID {songid:d}") + + return songs[0] + +def get_all_for_userid(userid): + return _from_db( + """ + select * from songs + inner join users on songs.userid = users.userid + where songs.userid = ? + order by songs.created desc + """, + [userid]) + +def get_all_for_username(username): + return _from_db( + """ + select * from songs + inner join users on songs.userid = users.userid + where users.username = ? + order by songs.created desc + """, + [username]) + +def get_all_for_username_and_tag(username, tag): + return _from_db( + """ + select * from song_tags + inner join songs on song_tags.songid = songs.songid + inner join users on songs.userid = users.userid + where (username = ? and tag = ?) + order by songs.created desc + """, + [username, tag]) + +def get_all_for_tag(tag): + return _from_db( + """ + select * from song_tags + inner join songs on song_tags.songid = songs.songid + inner join users on songs.userid = users.userid + where (tag = ?) + order by songs.created desc + """, + [tag]) + +def get_latest(count): + return _from_db( + """ + select * from songs + inner join users on songs.userid = users.userid + order by songs.created desc + limit ? + """, + [count]) + +def get_random(count): + # Get random songs + 10 extras so I can filter out my own + # (I uploaded too many :/) + songs = _from_db( + """ + select * from songs + inner join users on songs.userid = users.userid + where songid in ( + select songid from songs + order by random() + limit ? + ) + """, + [count + 10]) + random.shuffle(songs) + + # Prevent my songs from showing up in the first 10 results + for i in reversed(range(min(10, len(songs)))): + if songs[i].username == "cfulljames": + del songs[i] + + # Drop any extra songs (since we asked for 10 extras) + songs = songs[:count] + + return songs + +def get_for_playlist(playlistid): + return _from_db( + """ + select * from playlist_songs + inner join songs on playlist_songs.songid = songs.songid + inner join users on songs.userid = users.userid + where playlistid = ? + order by playlist_songs.position asc + """, + [playlistid]) + +def _from_db(query, args=()): + songs_data = db.query(query, args) + tags, collabs = _get_info_for_songs(songs_data) + songs = [] + for sd in songs_data: + song_tags = [t["tag"] for t in tags[sd["songid"]] if t["tag"]] + song_collabs = [c["name"] for c in collabs[sd["songid"]] if c["name"]] + created = datetime.fromisoformat(sd["created"]).astimezone().strftime("%Y-%m-%d") + has_pfp = users.user_has_pfp(sd["userid"]) + songs.append(Song( + sd["songid"], + sd["userid"], + sd["threadid"], + sd["username"], + sd["title"], + sanitize_user_text(sd["description"]), + created, + song_tags, + song_collabs, + has_pfp + )) + return songs + +def _get_info_for_songs(songs): + tags = {} + collabs = {} + for song in songs: + songid = song["songid"] + tags[songid] = db.query( + """ + select (tag) from song_tags + where songid = ? + """, + [songid]) + collabs[songid] = db.query( + """ + select (name) from song_collaborators + where songid = ? + """, + [songid]) + return tags, collabs @bp.get("/edit-song") def edit_song(): @@ -134,18 +206,24 @@ def edit_song(): songid = int(request.args["songid"]) except ValueError: # Invalid song id - file not found - current_app.logger.warning(f"Failed song edit - {session['username']} - invalid song ID {request.args['songid']}") + current_app.logger.warning( + f"Failed song edit - {session['username']} " + f"- invalid song ID {request.args['songid']}") abort(404) try: - song = Song.by_id(songid) + song = by_id(songid) if not song.userid == session["userid"]: # Can't edit someone else's song - 401 unauthorized - current_app.logger.warning(f"Failed song edit - {session['username']} - attempted update for unowned song") + current_app.logger.warning( + f"Failed song edit - {session['username']} " + "- attempted update for unowned song") abort(401) except ValueError: # Song doesn't exist - 404 file not found - current_app.logger.warning(f"Failed song edit - {session['username']} - song doesn't exist ({songid})") + current_app.logger.warning( + f"Failed song edit - {session['username']} " + f"- song doesn't exist ({songid})") abort(404) return render_template("edit-song.html", song=song, **song_colors) @@ -170,7 +248,8 @@ def upload_song(): current_app.logger.info(f"{username} uploaded/modified a song") if "songid" in request.args: # After editing an existing song, go back to song page - return redirect(f"/song/{userid}/{request.args['songid']}?action=view") + return redirect( + f"/song/{userid}/{request.args['songid']}?action=view") else: # After creating a new song, go back to profile return redirect(f"/users/{username}") @@ -245,7 +324,8 @@ def update_song(): if passed: # Move file to permanent location - filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3") + user_songs_path = datadir.get_user_songs_path(session["userid"]) + filepath = user_songs_path / (str(song_data["songid"]) + ".mp3") shutil.move(tmp_file.name, filepath) else: error = True @@ -253,18 +333,31 @@ def update_song(): if not error: # Update songs table db.query( - "update songs set title = ?, description = ? where songid = ?", - [title, description, songid]) + """ + update songs set title = ?, description = ? + where songid = ? + """, + [title, description, songid]) # Update song_tags table db.query("delete from song_tags where songid = ?", [songid]) for tag in tags: - db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid]) + db.query( + """ + insert into song_tags (tag, songid) + values (?, ?) + """, + [tag, songid]) # Update song_collaborators table db.query("delete from song_collaborators where songid = ?", [songid]) for collab in collaborators: - db.query("insert into song_collaborators (name, songid) values (?, ?)", [collab, songid]) + db.query( + """ + insert into song_collaborators (name, songid) + values (?, ?) + """, + [collab, songid]) db.commit() flash_and_log(f"Successfully updated '{title}'", "success") @@ -286,25 +379,37 @@ def create_song(): return True else: # Create comment thread - threadid = comments.create_thread(comments.ThreadType.SONG, session["userid"]) + threadid = comments.create_thread( + comments.ThreadType.SONG, session["userid"]) + # Create song timestamp = datetime.now(timezone.utc).isoformat() song_data = db.query( - "insert into songs (userid, title, description, created, threadid) values (?, ?, ?, ?, ?) returning (songid)", - [session["userid"], title, description, timestamp, threadid], one=True) - songid = song_data["songid"] - filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3") + """ + insert into songs (userid, title, description, created, threadid) + values (?, ?, ?, ?, ?) + returning (songid) + """, + [session["userid"], title, description, timestamp, threadid], + one=True) # Move file to permanent location + user_songs_path = datadir.get_user_songs_path(session["userid"]) + filepath = user_songs_path / (str(song_data["songid"]) + ".mp3") shutil.move(tmp_file.name, filepath) # Assign tags + songid = song_data["songid"] for tag in tags: - db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid]) + db.query( + "insert into song_tags (tag, songid) values (?, ?)", + [tag, songid]) # Assign collaborators for collab in collaborators: - db.query("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab]) + db.query( + "insert into song_collaborators (songid, name) values (?, ?)", + [songid, collab]) db.commit() @@ -340,7 +445,9 @@ def convert_song(tmp_file, request_file, yt_url): with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as out_file: out_file.close() os.remove(out_file.name) - result = subprocess.run(["ffmpeg", "-i", tmp_file.name, out_file.name], stdout=subprocess.PIPE) + result = subprocess.run( + ["ffmpeg", "-i", tmp_file.name, out_file.name], + stdout=subprocess.PIPE) if result.returncode == 0: # Successfully converted file, overwrite original file os.replace(out_file.name, tmp_file.name) @@ -363,16 +470,18 @@ def yt_import(tmp_file, yt_url): @bp.get("/delete-song/") def delete_song(songid): - - song_data = db.query("select * from songs where songid = ?", [songid], one=True) + song_data = db.query( + "select * from songs where songid = ?", [songid], one=True) if not song_data: - current_app.logger.warning(f"Failed song delete - {session['username']} - song doesn't exist") + current_app.logger.warning( + f"Failed song delete - {session['username']} - song doesn't exist") abort(404) # Song doesn't exist # Users can only delete their own songs if song_data["userid"] != session["userid"]: - current_app.logger.warning(f"Failed song delete - {session['username']} - user doesn't own song") + current_app.logger.warning( + f"Failed song delete - {session['username']} - user doesn't own song") abort(401) # Delete tags, collaborators @@ -388,7 +497,8 @@ def delete_song(songid): if songpath.exists(): os.remove(songpath) - current_app.logger.info(f"{session['username']} deleted song: {song_data['title']}") + current_app.logger.info( + f"{session['username']} deleted song: {song_data['title']}") flash_and_log(f"Deleted '{song_data['title']}'", "success") return redirect(f"/users/{session['username']}") @@ -397,7 +507,7 @@ def delete_song(songid): def song(userid, songid): if request.args.get("action", None) == "view": try: - song = Song.by_id(songid) + song = by_id(songid) if song.userid != userid: abort(404) @@ -409,7 +519,8 @@ def song(userid, songid): except ValueError: abort(404) else: - return send_from_directory(datadir.get_user_songs_path(userid), str(songid) + ".mp3") + return send_from_directory( + datadir.get_user_songs_path(userid), str(songid) + ".mp3") @bp.get("/songs") def view_songs(): @@ -421,13 +532,18 @@ def view_songs(): page_colors = users.get_user_colors(user) if tag and user: - page_songs = Song.get_all_for_username_and_tag(user, tag) + page_songs = get_all_for_username_and_tag(user, tag) elif tag: - page_songs = Song.get_all_for_tag(tag) + page_songs = get_all_for_tag(tag) elif user: - page_songs = Song.get_all_for_username(user) + page_songs = get_all_for_username(user) else: - page_songs = Song.get_random(50) - - return render_template("songs-by-tag.html", user=user, tag=tag, songs=page_songs, **page_colors) + page_songs = get_random(50) + + return render_template( + "songs-by-tag.html", + user=user, + tag=tag, + songs=page_songs, + **page_colors)