]> littlesong.place Git - littlesongplace.git/commitdiff
Clean up songs.py
authorChris Fulljames <christianfulljames@gmail.com>
Sun, 6 Apr 2025 11:31:28 +0000 (07:31 -0400)
committerChris Fulljames <christianfulljames@gmail.com>
Sun, 6 Apr 2025 11:31:28 +0000 (07:31 -0400)
src/littlesongplace/__init__.py
src/littlesongplace/comments.py
src/littlesongplace/profiles.py
src/littlesongplace/songs.py

index 94b4021dff6ec272bd1ef3f24413e7848caaff94..333b1daaa4ecd01f2610c77f2c879159dc6bd517 100644 (file)
@@ -71,7 +71,7 @@ def index():
     titles, weights = zip(*titles)
     title = random.choices(titles, weights)[0]
 
-    page_songs = songs.Song.get_latest(50)
+    page_songs = songs.get_latest(50)
     return render_template("index.html", users=all_users, songs=page_songs, page_title=title)
 
 @app.get("/activity")
@@ -97,7 +97,7 @@ def activity():
     for comment in notifications:
         threadtype = comment["threadtype"]
         if threadtype == comments.ThreadType.SONG:
-            song = songs.Song.by_threadid(comment["threadid"])
+            song = songs.by_threadid(comment["threadid"])
             comment["songid"] = song.songid
             comment["title"] = song.title
             comment["content_userid"] = song.userid
@@ -313,7 +313,7 @@ def playlists(playlistid):
             abort(404)  # Cannot view other user's private playlist - pretend it doesn't even exist
 
     # Get songs
-    plist_songs = songs.Song.get_for_playlist(playlistid)
+    plist_songs = songs.get_for_playlist(playlistid)
 
     # Get comments
     plist_comments = comments.for_thread(plist_data["threadid"])
index 1ddeaf8bfd1474b898e0f62900401f99f5801002..f12a857e74cd5202e0a92d9c55707980e37c1c1b 100644 (file)
@@ -74,7 +74,7 @@ def comment():
         profile = None
         playlist = None
         if threadtype == ThreadType.SONG:
-            song = songs.Song.by_threadid(request.args["threadid"])
+            song = songs.by_threadid(request.args["threadid"])
         elif threadtype == ThreadType.PROFILE:
             profile = db.query("select * from users where threadid = ?", [request.args["threadid"]], one=True)
         elif threadtype == ThreadType.PLAYLIST:
index fb8a4ca9d70a5f79e557c860abe5641bfaa7638e..2e25f620e71da1fce2cbc4c0602b11b91ea5883d 100644 (file)
@@ -24,7 +24,7 @@ def users_profile(profile_username):
         plist_data = db.query("select * from playlists where userid = ? and private = 0 order by updated desc", [profile_userid])
 
     # Get songs for current profile
-    profile_songs = songs.Song.get_all_for_userid(profile_userid)
+    profile_songs = songs.get_all_for_userid(profile_userid)
 
     # Get comments for current profile
     profile_comments = comments.for_thread(profile_data["threadid"])
index 96377425e29e9cf018f82d5c9688033ef7d3790a..62be5795e6cb5fc92fd474fab5863274efc6d238 100644 (file)
@@ -35,90 +35,162 @@ class Song:
     def get_comments(self):
         return comments.for_thread(self.threadid)
 
-    @classmethod
-    def by_id(cls, songid):
-        songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songid = ?", [songid])
-        if not songs:
-            raise ValueError(f"No song for ID {songid:d}")
-
-        return songs[0]
-
-    @classmethod
-    def by_threadid(cls, threadid):
-        songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songs.threadid = ?", [threadid])
-        if not songs:
-            raise ValueError(f"No song for Thread ID {songid:d}")
-
-        return songs[0]
-
-    @classmethod
-    def get_all_for_userid(cls, userid):
-        return cls._from_db("select * from songs inner join users on songs.userid = users.userid where songs.userid = ? order by songs.created desc", [userid])
-
-    @classmethod
-    def get_all_for_username(cls, username):
-        return cls._from_db("select * from songs inner join users on songs.userid = users.userid where users.username = ? order by songs.created desc", [username])
-
-    @classmethod
-    def get_all_for_username_and_tag(cls, username, tag):
-        return cls._from_db(f"select * from song_tags inner join songs on song_tags.songid = songs.songid inner join users on songs.userid = users.userid where (username = ? and tag = ?) order by songs.created desc", [username, tag])
-
-    @classmethod
-    def get_all_for_tag(cls, tag):
-        return cls._from_db(f"select * from song_tags inner join songs on song_tags.songid = songs.songid inner join users on songs.userid = users.userid where (tag = ?) order by songs.created desc", [tag])
-
-    @classmethod
-    def get_latest(cls, count):
-        return cls._from_db("select * from songs inner join users on songs.userid = users.userid order by songs.created desc limit ?", [count])
-
-    @classmethod
-    def get_random(cls, count):
-        # Get random songs + 10 extras so I can filter out my own (I uploaded too many :/)
-        songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songid in (select songid from songs order by random() limit ?)", [count + 10])
-        random.shuffle(songs)
-
-        # Prevent my songs from showing up in the first 10 results
-        for i in reversed(range(min(10, len(songs)))):
-            if songs[i].username == "cfulljames":
-                del songs[i]
-
-        # Drop any extra songs (since we asked for 10 extras)
-        songs = songs[:count]
-
-        return songs
-
-    @classmethod
-    def get_for_playlist(cls, playlistid):
-        return cls._from_db("""\
-            select * from playlist_songs
-            inner join songs on playlist_songs.songid = songs.songid
-            inner join users on songs.userid = users.userid
-            where playlistid = ?
-            order by playlist_songs.position asc
-            """, [playlistid])
-
-    @classmethod
-    def _from_db(cls, query, args=()):
-        songs_data = db.query(query, args)
-        tags, collabs = cls._get_info_for_songs(songs_data)
-        songs = []
-        for sd in songs_data:
-            song_tags = [t["tag"] for t in tags[sd["songid"]] if t["tag"]]
-            song_collabs = [c["name"] for c in collabs[sd["songid"]] if c["name"]]
-            created = datetime.fromisoformat(sd["created"]).astimezone().strftime("%Y-%m-%d")
-            has_pfp = users.user_has_pfp(sd["userid"])
-            songs.append(cls(sd["songid"], sd["userid"], sd["threadid"], sd["username"], sd["title"], sanitize_user_text(sd["description"]), created, song_tags, song_collabs, has_pfp))
-        return songs
-
-    @classmethod
-    def _get_info_for_songs(cls, songs):
-        tags = {}
-        collabs = {}
-        for song in songs:
-            songid = song["songid"]
-            tags[songid] = db.query("select (tag) from song_tags where songid = ?", [songid])
-            collabs[songid] = db.query("select (name) from song_collaborators where songid = ?", [songid])
-        return tags, collabs
+def by_id(songid):
+    songs = _from_db(
+        """
+        select * from songs
+        inner join users on songs.userid = users.userid
+        where songid = ?
+        """,
+        [songid])
+    if not songs:
+        raise ValueError(f"No song for ID {songid:d}")
+
+    return songs[0]
+
+def by_threadid(threadid):
+    songs = _from_db(
+        """
+        select * from songs
+        inner join users on songs.userid = users.userid
+        where songs.threadid = ?
+        """,
+        [threadid])
+    if not songs:
+        raise ValueError(f"No song for Thread ID {songid:d}")
+
+    return songs[0]
+
+def get_all_for_userid(userid):
+    return _from_db(
+        """
+        select * from songs
+        inner join users on songs.userid = users.userid
+        where songs.userid = ?
+        order by songs.created desc
+        """,
+        [userid])
+
+def get_all_for_username(username):
+    return _from_db(
+        """
+        select * from songs
+        inner join users on songs.userid = users.userid
+        where users.username = ?
+        order by songs.created desc
+        """,
+        [username])
+
+def get_all_for_username_and_tag(username, tag):
+    return _from_db(
+        """
+        select * from song_tags
+        inner join songs on song_tags.songid = songs.songid
+        inner join users on songs.userid = users.userid
+        where (username = ? and tag = ?)
+        order by songs.created desc
+        """,
+        [username, tag])
+
+def get_all_for_tag(tag):
+    return _from_db(
+        """
+        select * from song_tags
+        inner join songs on song_tags.songid = songs.songid
+        inner join users on songs.userid = users.userid
+        where (tag = ?)
+        order by songs.created desc
+        """,
+        [tag])
+
+def get_latest(count):
+    return _from_db(
+        """
+        select * from songs
+        inner join users on songs.userid = users.userid
+        order by songs.created desc
+        limit ?
+        """,
+        [count])
+
+def get_random(count):
+    # Get random songs + 10 extras so I can filter out my own
+    # (I uploaded too many :/)
+    songs = _from_db(
+        """
+        select * from songs
+        inner join users on songs.userid = users.userid
+        where songid in (
+            select songid from songs
+            order by random()
+            limit ?
+        )
+        """,
+        [count + 10])
+    random.shuffle(songs)
+
+    # Prevent my songs from showing up in the first 10 results
+    for i in reversed(range(min(10, len(songs)))):
+        if songs[i].username == "cfulljames":
+            del songs[i]
+
+    # Drop any extra songs (since we asked for 10 extras)
+    songs = songs[:count]
+
+    return songs
+
+def get_for_playlist(playlistid):
+    return _from_db(
+        """
+        select * from playlist_songs
+        inner join songs on playlist_songs.songid = songs.songid
+        inner join users on songs.userid = users.userid
+        where playlistid = ?
+        order by playlist_songs.position asc
+        """,
+        [playlistid])
+
+def _from_db(query, args=()):
+    songs_data = db.query(query, args)
+    tags, collabs = _get_info_for_songs(songs_data)
+    songs = []
+    for sd in songs_data:
+        song_tags = [t["tag"] for t in tags[sd["songid"]] if t["tag"]]
+        song_collabs = [c["name"] for c in collabs[sd["songid"]] if c["name"]]
+        created = datetime.fromisoformat(sd["created"]).astimezone().strftime("%Y-%m-%d")
+        has_pfp = users.user_has_pfp(sd["userid"])
+        songs.append(Song(
+            sd["songid"],
+            sd["userid"],
+            sd["threadid"],
+            sd["username"],
+            sd["title"],
+            sanitize_user_text(sd["description"]),
+            created,
+            song_tags,
+            song_collabs,
+            has_pfp
+        ))
+    return songs
+
+def _get_info_for_songs(songs):
+    tags = {}
+    collabs = {}
+    for song in songs:
+        songid = song["songid"]
+        tags[songid] = db.query(
+            """
+            select (tag) from song_tags
+            where songid = ?
+            """,
+            [songid])
+        collabs[songid] = db.query(
+            """
+            select (name) from song_collaborators
+            where songid = ?
+            """,
+            [songid])
+    return tags, collabs
 
 @bp.get("/edit-song")
 def edit_song():
@@ -134,18 +206,24 @@ def edit_song():
             songid = int(request.args["songid"])
         except ValueError:
             # Invalid song id - file not found
-            current_app.logger.warning(f"Failed song edit - {session['username']} - invalid song ID {request.args['songid']}")
+            current_app.logger.warning(
+                f"Failed song edit - {session['username']} "
+                f"- invalid song ID {request.args['songid']}")
             abort(404)
 
         try:
-            song = Song.by_id(songid)
+            song = by_id(songid)
             if not song.userid == session["userid"]:
                 # Can't edit someone else's song - 401 unauthorized
-                current_app.logger.warning(f"Failed song edit - {session['username']} - attempted update for unowned song")
+                current_app.logger.warning(
+                    f"Failed song edit - {session['username']} "
+                    "- attempted update for unowned song")
                 abort(401)
         except ValueError:
             # Song doesn't exist - 404 file not found
-            current_app.logger.warning(f"Failed song edit - {session['username']} - song doesn't exist ({songid})")
+            current_app.logger.warning(
+                f"Failed song edit - {session['username']} "
+                f"- song doesn't exist ({songid})")
             abort(404)
 
     return render_template("edit-song.html", song=song, **song_colors)
@@ -170,7 +248,8 @@ def upload_song():
         current_app.logger.info(f"{username} uploaded/modified a song")
         if "songid" in request.args:
             # After editing an existing song, go back to song page
-            return redirect(f"/song/{userid}/{request.args['songid']}?action=view")
+            return redirect(
+                f"/song/{userid}/{request.args['songid']}?action=view")
         else:
             # After creating a new song, go back to profile
             return redirect(f"/users/{username}")
@@ -245,7 +324,8 @@ def update_song():
 
             if passed:
                 # Move file to permanent location
-                filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
+                user_songs_path = datadir.get_user_songs_path(session["userid"])
+                filepath = user_songs_path / (str(song_data["songid"]) + ".mp3")
                 shutil.move(tmp_file.name, filepath)
             else:
                 error = True
@@ -253,18 +333,31 @@ def update_song():
     if not error:
         # Update songs table
         db.query(
-                "update songs set title = ?, description = ? where songid = ?",
-                [title, description, songid])
+            """
+            update songs set title = ?, description = ?
+            where songid = ?
+            """,
+            [title, description, songid])
 
         # Update song_tags table
         db.query("delete from song_tags where songid = ?", [songid])
         for tag in tags:
-            db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
+            db.query(
+                """
+                insert into song_tags (tag, songid)
+                values (?, ?)
+                """,
+                [tag, songid])
 
         # Update song_collaborators table
         db.query("delete from song_collaborators where songid = ?", [songid])
         for collab in collaborators:
-            db.query("insert into song_collaborators (name, songid) values (?, ?)", [collab, songid])
+            db.query(
+                """
+                insert into song_collaborators (name, songid)
+                values (?, ?)
+                """,
+                [collab, songid])
 
         db.commit()
         flash_and_log(f"Successfully updated '{title}'", "success")
@@ -286,25 +379,37 @@ def create_song():
             return True
         else:
             # Create comment thread
-            threadid = comments.create_thread(comments.ThreadType.SONG, session["userid"])
+            threadid = comments.create_thread(
+                comments.ThreadType.SONG, session["userid"])
+
             # Create song
             timestamp = datetime.now(timezone.utc).isoformat()
             song_data = db.query(
-                    "insert into songs (userid, title, description, created, threadid) values (?, ?, ?, ?, ?) returning (songid)",
-                    [session["userid"], title, description, timestamp, threadid], one=True)
-            songid = song_data["songid"]
-            filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
+                """
+                insert into songs (userid, title, description, created, threadid)
+                values (?, ?, ?, ?, ?)
+                returning (songid)
+                """,
+                [session["userid"], title, description, timestamp, threadid],
+                one=True)
 
             # Move file to permanent location
+            user_songs_path = datadir.get_user_songs_path(session["userid"])
+            filepath = user_songs_path / (str(song_data["songid"]) + ".mp3")
             shutil.move(tmp_file.name, filepath)
 
             # Assign tags
+            songid = song_data["songid"]
             for tag in tags:
-                db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
+                db.query(
+                    "insert into song_tags (tag, songid) values (?, ?)",
+                    [tag, songid])
 
             # Assign collaborators
             for collab in collaborators:
-                db.query("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab])
+                db.query(
+                    "insert into song_collaborators (songid, name) values (?, ?)",
+                    [songid, collab])
 
             db.commit()
 
@@ -340,7 +445,9 @@ def convert_song(tmp_file, request_file, yt_url):
     with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as out_file:
         out_file.close()
         os.remove(out_file.name)
-        result = subprocess.run(["ffmpeg", "-i", tmp_file.name, out_file.name], stdout=subprocess.PIPE)
+        result = subprocess.run(
+            ["ffmpeg", "-i", tmp_file.name, out_file.name],
+            stdout=subprocess.PIPE)
         if result.returncode == 0:
             # Successfully converted file, overwrite original file
             os.replace(out_file.name, tmp_file.name)
@@ -363,16 +470,18 @@ def yt_import(tmp_file, yt_url):
 
 @bp.get("/delete-song/<int:songid>")
 def delete_song(songid):
-
-    song_data = db.query("select * from songs where songid = ?", [songid], one=True)
+    song_data = db.query(
+        "select * from songs where songid = ?", [songid], one=True)
 
     if not song_data:
-        current_app.logger.warning(f"Failed song delete - {session['username']} - song doesn't exist")
+        current_app.logger.warning(
+            f"Failed song delete - {session['username']} - song doesn't exist")
         abort(404)  # Song doesn't exist
 
     # Users can only delete their own songs
     if song_data["userid"] != session["userid"]:
-        current_app.logger.warning(f"Failed song delete - {session['username']} - user doesn't own song")
+        current_app.logger.warning(
+            f"Failed song delete - {session['username']} - user doesn't own song")
         abort(401)
 
     # Delete tags, collaborators
@@ -388,7 +497,8 @@ def delete_song(songid):
     if songpath.exists():
         os.remove(songpath)
 
-    current_app.logger.info(f"{session['username']} deleted song: {song_data['title']}")
+    current_app.logger.info(
+        f"{session['username']} deleted song: {song_data['title']}")
     flash_and_log(f"Deleted '{song_data['title']}'", "success")
 
     return redirect(f"/users/{session['username']}")
@@ -397,7 +507,7 @@ def delete_song(songid):
 def song(userid, songid):
     if request.args.get("action", None) == "view":
         try:
-            song = Song.by_id(songid)
+            song = by_id(songid)
             if song.userid != userid:
                 abort(404)
 
@@ -409,7 +519,8 @@ def song(userid, songid):
         except ValueError:
             abort(404)
     else:
-        return send_from_directory(datadir.get_user_songs_path(userid), str(songid) + ".mp3")
+        return send_from_directory(
+            datadir.get_user_songs_path(userid), str(songid) + ".mp3")
 
 @bp.get("/songs")
 def view_songs():
@@ -421,13 +532,18 @@ def view_songs():
         page_colors = users.get_user_colors(user)
 
     if tag and user:
-        page_songs = Song.get_all_for_username_and_tag(user, tag)
+        page_songs = get_all_for_username_and_tag(user, tag)
     elif tag:
-        page_songs = Song.get_all_for_tag(tag)
+        page_songs = get_all_for_tag(tag)
     elif user:
-        page_songs = Song.get_all_for_username(user)
+        page_songs = get_all_for_username(user)
     else:
-        page_songs = Song.get_random(50)
-
-    return render_template("songs-by-tag.html", user=user, tag=tag, songs=page_songs, **page_colors)
+        page_songs = get_random(50)
+
+    return render_template(
+        "songs-by-tag.html",
+        user=user,
+        tag=tag,
+        songs=page_songs,
+        **page_colors)