]> littlesong.place Git - littlesongplace.git/commitdiff
Refactor songs
authorChris Fulljames <christianfulljames@gmail.com>
Sat, 5 Apr 2025 19:00:46 +0000 (15:00 -0400)
committerChris Fulljames <christianfulljames@gmail.com>
Sat, 5 Apr 2025 19:00:46 +0000 (15:00 -0400)
src/littlesongplace/__init__.py
src/littlesongplace/songs.py

index f49b983423012a1110e18eb0c722c8ea5b5e82bd..94b4021dff6ec272bd1ef3f24413e7848caaff94 100644 (file)
@@ -41,8 +41,9 @@ app = Flask(__name__)
 app.secret_key = os.environ["SECRET_KEY"] if "SECRET_KEY" in os.environ else "dev"
 app.config["MAX_CONTENT_LENGTH"] = 1 * 1024 * 1024 * 1024
 app.register_blueprint(auth.bp)
-app.register_blueprint(profiles.bp)
 app.register_blueprint(comments.bp)
+app.register_blueprint(profiles.bp)
+app.register_blueprint(songs.bp)
 db.init_app(app)
 
 if "DATA_DIR" in os.environ:
@@ -73,317 +74,6 @@ def index():
     page_songs = songs.Song.get_latest(50)
     return render_template("index.html", users=all_users, songs=page_songs, page_title=title)
 
-@app.get("/edit-song")
-def edit_song():
-    if not "userid" in session:
-        return redirect("/login")  # Must be logged in to edit
-
-    song = None
-
-    colors = users.get_user_colors(session["userid"])
-
-    if "songid" in request.args:
-        try:
-            songid = int(request.args["songid"])
-        except ValueError:
-            # Invalid song id - file not found
-            app.logger.warning(f"Failed song edit - {session['username']} - invalid song ID {request.args['songid']}")
-            abort(404)
-
-        try:
-            song = songs.Song.by_id(songid)
-            if not song.userid == session["userid"]:
-                # Can't edit someone else's song - 401 unauthorized
-                app.logger.warning(f"Failed song edit - {session['username']} - attempted update for unowned song")
-                abort(401)
-        except ValueError:
-            # Song doesn't exist - 404 file not found
-            app.logger.warning(f"Failed song edit - {session['username']} - song doesn't exist ({songid})")
-            abort(404)
-
-    return render_template("edit-song.html", song=song, **colors)
-
-@app.post("/upload-song")
-def upload_song():
-    if not "userid" in session:
-        return redirect("/login")  # Must be logged in to edit
-
-    userid = session["userid"]
-
-    error = validate_song_form()
-
-    if not error:
-        if "songid" in request.args:
-            error = update_song()
-        else:
-            error = create_song()
-
-    if not error:
-        username = session["username"]
-        app.logger.info(f"{username} uploaded/modified a song")
-        if "songid" in request.args:
-            # After editing an existing song, go back to song page
-            return redirect(f"/song/{userid}/{request.args['songid']}?action=view")
-        else:
-            # After creating a new song, go back to profile
-            return redirect(f"/users/{username}")
-
-    else:
-        username = session["username"]
-        app.logger.info(f"Failed song update - {username}")
-        return redirect(request.referrer)
-
-def validate_song_form():
-    title = request.form["title"]
-    description = request.form["description"]
-
-    error = False
-
-    # Check if title is valid
-    if not title.isprintable():
-        flash_and_log(f"'{title}' is not a valid song title", "error")
-        error = True
-    elif len(title) > 80:
-        flash_and_log(f"Title cannot be more than 80 characters", "error")
-        error = True
-
-    # Check if description is valid
-    if len(description) > 10_000:
-        flash_and_log(f"Description cannot be more than 10k characters", "error")
-        error = True
-
-    # Check if tags are valid
-    tags = request.form["tags"]
-    tags = [t.strip() for t in tags.split(",")]
-    for tag in tags:
-        if not tag.isprintable() or len(tag) > 30:
-            flash_and_log(f"'{tag}' is not a valid tag name", "error")
-            error = True
-
-    # Check if collaborators are valid
-    collaborators = request.form["collabs"]
-    collaborators = [c.strip() for c in collaborators.split(",")]
-    for collab in collaborators:
-        if not collab.isprintable() or len(collab) > 31:  # 30ch username + @
-            flash_and_log(f"'{collab}' is not a valid collaborator name", "error")
-            error = True
-
-    return error
-
-def update_song():
-    songid = request.args["songid"]
-    try:
-        int(songid)
-    except ValueError:
-        abort(400)
-
-    file = request.files["song-file"] if "song-file" in request.files else None
-    yt_url = request.form["song-url"] if "song-url" in request.form else None
-    title = request.form["title"]
-    description = request.form["description"]
-    tags = [t.strip() for t in request.form["tags"].split(",") if t]
-    collaborators = [c.strip() for c in request.form["collabs"].split(",") if c]
-
-    # Make sure song exists and the logged-in user owns it
-    song_data = db.query("select * from songs where songid = ?", [songid], one=True)
-    if song_data is None:
-        abort(400)
-    elif session["userid"] != song_data["userid"]:
-        abort(401)
-
-    error = False
-    if file or yt_url:
-        with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
-            passed = convert_song(tmp_file, file, yt_url)
-
-            if passed:
-                # Move file to permanent location
-                filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
-                shutil.move(tmp_file.name, filepath)
-            else:
-                error = True
-
-    if not error:
-        # Update songs table
-        db.query(
-                "update songs set title = ?, description = ? where songid = ?",
-                [title, description, songid])
-
-        # Update song_tags table
-        db.query("delete from song_tags where songid = ?", [songid])
-        for tag in tags:
-            db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
-
-        # Update song_collaborators table
-        db.query("delete from song_collaborators where songid = ?", [songid])
-        for collab in collaborators:
-            db.query("insert into song_collaborators (name, songid) values (?, ?)", [collab, songid])
-
-        db.commit()
-        flash_and_log(f"Successfully updated '{title}'", "success")
-
-    return error
-
-def create_song():
-    file = request.files["song-file"] if "song-file" in request.files else None
-    yt_url = request.form["song-url"] if "song-url" in request.form else None
-    title = request.form["title"]
-    description = request.form["description"]
-    tags = [t.strip() for t in request.form["tags"].split(",") if t]
-    collaborators = [c.strip() for c in request.form["collabs"].split(",") if c]
-
-    with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
-        passed = convert_song(tmp_file, file, yt_url)
-
-        if not passed:
-            return True
-        else:
-            # Create comment thread
-            threadid = comments.create_thread(comments.ThreadType.SONG, session["userid"])
-            # Create song
-            timestamp = datetime.now(timezone.utc).isoformat()
-            song_data = db.query(
-                    "insert into songs (userid, title, description, created, threadid) values (?, ?, ?, ?, ?) returning (songid)",
-                    [session["userid"], title, description, timestamp, threadid], one=True)
-            songid = song_data["songid"]
-            filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
-
-            # Move file to permanent location
-            shutil.move(tmp_file.name, filepath)
-
-            # Assign tags
-            for tag in tags:
-                db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
-
-            # Assign collaborators
-            for collab in collaborators:
-                db.query("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab])
-
-            db.commit()
-
-            flash_and_log(f"Successfully uploaded '{title}'", "success")
-            return False
-
-def convert_song(tmp_file, request_file, yt_url):
-    if request_file:
-        # Get uploaded file
-        request_file.save(tmp_file)
-        tmp_file.close()
-    else:
-        # Import from YouTube
-        tmp_file.close()
-        os.unlink(tmp_file.name)  # Delete file so yt-dlp doesn't complain
-        try:
-            yt_import(tmp_file, yt_url)
-        except DownloadError as ex:
-            app.logger.warning(str(ex))
-            flash_and_log(f"Failed to import from YouTube URL: {yt_url}")
-            return False
-
-    result = subprocess.run(["mpck", tmp_file.name], stdout=subprocess.PIPE)
-    res_stdout = result.stdout.decode()
-    app.logger.info(f"mpck result: \n {res_stdout}")
-    lines = res_stdout.split("\n")
-    lines = [l.strip().lower() for l in lines]
-    if any(l.startswith("result") and l.endswith("ok") for l in lines):
-        # Uploaded valid mp3 file
-        return True
-
-    # Not a valid mp3, try to convert with ffmpeg
-    with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as out_file:
-        out_file.close()
-        os.remove(out_file.name)
-        result = subprocess.run(["ffmpeg", "-i", tmp_file.name, out_file.name], stdout=subprocess.PIPE)
-        if result.returncode == 0:
-            # Successfully converted file, overwrite original file
-            os.replace(out_file.name, tmp_file.name)
-            return True
-
-        if os.path.exists(out_file.name):
-            os.remove(out_file.name)
-
-    flash_and_log("Invalid audio file", "error")
-    return False
-
-def yt_import(tmp_file, yt_url):
-    ydl_opts = {
-        'format': 'm4a/bestaudio/best',
-        'outtmpl': tmp_file.name,
-        'logger': app.logger,
-    }
-    with YoutubeDL(ydl_opts) as ydl:
-        ydl.download([yt_url])
-
-@app.get("/delete-song/<int:songid>")
-def delete_song(songid):
-
-    song_data = db.query("select * from songs where songid = ?", [songid], one=True)
-
-    if not song_data:
-        app.logger.warning(f"Failed song delete - {session['username']} - song doesn't exist")
-        abort(404)  # Song doesn't exist
-
-    # Users can only delete their own songs
-    if song_data["userid"] != session["userid"]:
-        app.logger.warning(f"Failed song delete - {session['username']} - user doesn't own song")
-        abort(401)
-
-    # Delete tags, collaborators
-    db.query("delete from song_tags where songid = ?", [songid])
-    db.query("delete from song_collaborators where songid = ?", [songid])
-
-    # Delete song database entry
-    db.query("delete from songs where songid = ?", [songid])
-    db.commit()
-
-    # Delete song file from disk
-    songpath = datadir.get_user_songs_path(session["userid"]) / (str(songid) + ".mp3")
-    if songpath.exists():
-        os.remove(songpath)
-
-    app.logger.info(f"{session['username']} deleted song: {song_data['title']}")
-    flash_and_log(f"Deleted '{song_data['title']}'", "success")
-
-    return redirect(f"/users/{session['username']}")
-
-@app.get("/song/<int:userid>/<int:songid>")
-def song(userid, songid):
-    if request.args.get("action", None) == "view":
-        try:
-            song = songs.Song.by_id(songid)
-            if song.userid != userid:
-                abort(404)
-
-            return render_template(
-                    "song.html",
-                    songs=[song],
-                    song=song,
-                    **users.get_user_colors(userid))
-        except ValueError:
-            abort(404)
-    else:
-        return send_from_directory(datadir.get_user_songs_path(userid), str(songid) + ".mp3")
-
-@app.get("/songs")
-def view_songs():
-    tag = request.args.get("tag", None)
-    user = request.args.get("user", None)
-
-    page_colors = colors.DEFAULT_COLORS
-    if user:
-        page_colors = users.get_user_colors(user)
-
-    if tag and user:
-        page_songs = songs.Song.get_all_for_username_and_tag(user, tag)
-    elif tag:
-        page_songs = songs.Song.get_all_for_tag(tag)
-    elif user:
-        page_songs = songs.Song.get_all_for_username(user)
-    else:
-        page_songs = songs.Song.get_random(50)
-
-    return render_template("songs-by-tag.html", user=user, tag=tag, songs=page_songs, **page_colors)
-
 @app.get("/activity")
 def activity():
     if not "userid" in session:
index 471da51b1bffcf35b18cca45a11af8f5d5d82f3b..96377425e29e9cf018f82d5c9688033ef7d3790a 100644 (file)
@@ -1,10 +1,20 @@
 import json
+import os
 import random
-from datetime import datetime
+import shutil
+import subprocess
+import tempfile
+from datetime import datetime, timezone
 from dataclasses import dataclass
 
-from . import comments, db, users
+from flask import Blueprint, current_app, render_template, request, redirect, \
+        session, abort, send_from_directory
+
+from . import comments, colors, datadir, db, users
 from .sanitize import sanitize_user_text
+from .logutils import flash_and_log
+
+bp = Blueprint("songs", __name__)
 
 @dataclass
 class Song:
@@ -110,3 +120,314 @@ class Song:
             collabs[songid] = db.query("select (name) from song_collaborators where songid = ?", [songid])
         return tags, collabs
 
+@bp.get("/edit-song")
+def edit_song():
+    if not "userid" in session:
+        return redirect("/login")  # Must be logged in to edit
+
+    song = None
+
+    song_colors = users.get_user_colors(session["userid"])
+
+    if "songid" in request.args:
+        try:
+            songid = int(request.args["songid"])
+        except ValueError:
+            # Invalid song id - file not found
+            current_app.logger.warning(f"Failed song edit - {session['username']} - invalid song ID {request.args['songid']}")
+            abort(404)
+
+        try:
+            song = Song.by_id(songid)
+            if not song.userid == session["userid"]:
+                # Can't edit someone else's song - 401 unauthorized
+                current_app.logger.warning(f"Failed song edit - {session['username']} - attempted update for unowned song")
+                abort(401)
+        except ValueError:
+            # Song doesn't exist - 404 file not found
+            current_app.logger.warning(f"Failed song edit - {session['username']} - song doesn't exist ({songid})")
+            abort(404)
+
+    return render_template("edit-song.html", song=song, **song_colors)
+
+@bp.post("/upload-song")
+def upload_song():
+    if not "userid" in session:
+        return redirect("/login")  # Must be logged in to edit
+
+    userid = session["userid"]
+
+    error = validate_song_form()
+
+    if not error:
+        if "songid" in request.args:
+            error = update_song()
+        else:
+            error = create_song()
+
+    if not error:
+        username = session["username"]
+        current_app.logger.info(f"{username} uploaded/modified a song")
+        if "songid" in request.args:
+            # After editing an existing song, go back to song page
+            return redirect(f"/song/{userid}/{request.args['songid']}?action=view")
+        else:
+            # After creating a new song, go back to profile
+            return redirect(f"/users/{username}")
+
+    else:
+        username = session["username"]
+        current_app.logger.info(f"Failed song update - {username}")
+        return redirect(request.referrer)
+
+def validate_song_form():
+    title = request.form["title"]
+    description = request.form["description"]
+
+    error = False
+
+    # Check if title is valid
+    if not title.isprintable():
+        flash_and_log(f"'{title}' is not a valid song title", "error")
+        error = True
+    elif len(title) > 80:
+        flash_and_log(f"Title cannot be more than 80 characters", "error")
+        error = True
+
+    # Check if description is valid
+    if len(description) > 10_000:
+        flash_and_log(f"Description cannot be more than 10k characters", "error")
+        error = True
+
+    # Check if tags are valid
+    tags = request.form["tags"]
+    tags = [t.strip() for t in tags.split(",")]
+    for tag in tags:
+        if not tag.isprintable() or len(tag) > 30:
+            flash_and_log(f"'{tag}' is not a valid tag name", "error")
+            error = True
+
+    # Check if collaborators are valid
+    collaborators = request.form["collabs"]
+    collaborators = [c.strip() for c in collaborators.split(",")]
+    for collab in collaborators:
+        if not collab.isprintable() or len(collab) > 31:  # 30ch username + @
+            flash_and_log(f"'{collab}' is not a valid collaborator name", "error")
+            error = True
+
+    return error
+
+def update_song():
+    songid = request.args["songid"]
+    try:
+        int(songid)
+    except ValueError:
+        abort(400)
+
+    file = request.files["song-file"] if "song-file" in request.files else None
+    yt_url = request.form["song-url"] if "song-url" in request.form else None
+    title = request.form["title"]
+    description = request.form["description"]
+    tags = [t.strip() for t in request.form["tags"].split(",") if t]
+    collaborators = [c.strip() for c in request.form["collabs"].split(",") if c]
+
+    # Make sure song exists and the logged-in user owns it
+    song_data = db.query("select * from songs where songid = ?", [songid], one=True)
+    if song_data is None:
+        abort(400)
+    elif session["userid"] != song_data["userid"]:
+        abort(401)
+
+    error = False
+    if file or yt_url:
+        with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
+            passed = convert_song(tmp_file, file, yt_url)
+
+            if passed:
+                # Move file to permanent location
+                filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
+                shutil.move(tmp_file.name, filepath)
+            else:
+                error = True
+
+    if not error:
+        # Update songs table
+        db.query(
+                "update songs set title = ?, description = ? where songid = ?",
+                [title, description, songid])
+
+        # Update song_tags table
+        db.query("delete from song_tags where songid = ?", [songid])
+        for tag in tags:
+            db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
+
+        # Update song_collaborators table
+        db.query("delete from song_collaborators where songid = ?", [songid])
+        for collab in collaborators:
+            db.query("insert into song_collaborators (name, songid) values (?, ?)", [collab, songid])
+
+        db.commit()
+        flash_and_log(f"Successfully updated '{title}'", "success")
+
+    return error
+
+def create_song():
+    file = request.files["song-file"] if "song-file" in request.files else None
+    yt_url = request.form["song-url"] if "song-url" in request.form else None
+    title = request.form["title"]
+    description = request.form["description"]
+    tags = [t.strip() for t in request.form["tags"].split(",") if t]
+    collaborators = [c.strip() for c in request.form["collabs"].split(",") if c]
+
+    with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
+        passed = convert_song(tmp_file, file, yt_url)
+
+        if not passed:
+            return True
+        else:
+            # Create comment thread
+            threadid = comments.create_thread(comments.ThreadType.SONG, session["userid"])
+            # Create song
+            timestamp = datetime.now(timezone.utc).isoformat()
+            song_data = db.query(
+                    "insert into songs (userid, title, description, created, threadid) values (?, ?, ?, ?, ?) returning (songid)",
+                    [session["userid"], title, description, timestamp, threadid], one=True)
+            songid = song_data["songid"]
+            filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
+
+            # Move file to permanent location
+            shutil.move(tmp_file.name, filepath)
+
+            # Assign tags
+            for tag in tags:
+                db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
+
+            # Assign collaborators
+            for collab in collaborators:
+                db.query("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab])
+
+            db.commit()
+
+            flash_and_log(f"Successfully uploaded '{title}'", "success")
+            return False
+
+def convert_song(tmp_file, request_file, yt_url):
+    if request_file:
+        # Get uploaded file
+        request_file.save(tmp_file)
+        tmp_file.close()
+    else:
+        # Import from YouTube
+        tmp_file.close()
+        os.unlink(tmp_file.name)  # Delete file so yt-dlp doesn't complain
+        try:
+            yt_import(tmp_file, yt_url)
+        except DownloadError as ex:
+            current_app.logger.warning(str(ex))
+            flash_and_log(f"Failed to import from YouTube URL: {yt_url}")
+            return False
+
+    result = subprocess.run(["mpck", tmp_file.name], stdout=subprocess.PIPE)
+    res_stdout = result.stdout.decode()
+    current_app.logger.info(f"mpck result: \n {res_stdout}")
+    lines = res_stdout.split("\n")
+    lines = [l.strip().lower() for l in lines]
+    if any(l.startswith("result") and l.endswith("ok") for l in lines):
+        # Uploaded valid mp3 file
+        return True
+
+    # Not a valid mp3, try to convert with ffmpeg
+    with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as out_file:
+        out_file.close()
+        os.remove(out_file.name)
+        result = subprocess.run(["ffmpeg", "-i", tmp_file.name, out_file.name], stdout=subprocess.PIPE)
+        if result.returncode == 0:
+            # Successfully converted file, overwrite original file
+            os.replace(out_file.name, tmp_file.name)
+            return True
+
+        if os.path.exists(out_file.name):
+            os.remove(out_file.name)
+
+    flash_and_log("Invalid audio file", "error")
+    return False
+
+def yt_import(tmp_file, yt_url):
+    ydl_opts = {
+        'format': 'm4a/bestaudio/best',
+        'outtmpl': tmp_file.name,
+        'logger': current_app.logger,
+    }
+    with YoutubeDL(ydl_opts) as ydl:
+        ydl.download([yt_url])
+
+@bp.get("/delete-song/<int:songid>")
+def delete_song(songid):
+
+    song_data = db.query("select * from songs where songid = ?", [songid], one=True)
+
+    if not song_data:
+        current_app.logger.warning(f"Failed song delete - {session['username']} - song doesn't exist")
+        abort(404)  # Song doesn't exist
+
+    # Users can only delete their own songs
+    if song_data["userid"] != session["userid"]:
+        current_app.logger.warning(f"Failed song delete - {session['username']} - user doesn't own song")
+        abort(401)
+
+    # Delete tags, collaborators
+    db.query("delete from song_tags where songid = ?", [songid])
+    db.query("delete from song_collaborators where songid = ?", [songid])
+
+    # Delete song database entry
+    db.query("delete from songs where songid = ?", [songid])
+    db.commit()
+
+    # Delete song file from disk
+    songpath = datadir.get_user_songs_path(session["userid"]) / (str(songid) + ".mp3")
+    if songpath.exists():
+        os.remove(songpath)
+
+    current_app.logger.info(f"{session['username']} deleted song: {song_data['title']}")
+    flash_and_log(f"Deleted '{song_data['title']}'", "success")
+
+    return redirect(f"/users/{session['username']}")
+
+@bp.get("/song/<int:userid>/<int:songid>")
+def song(userid, songid):
+    if request.args.get("action", None) == "view":
+        try:
+            song = Song.by_id(songid)
+            if song.userid != userid:
+                abort(404)
+
+            return render_template(
+                    "song.html",
+                    songs=[song],
+                    song=song,
+                    **users.get_user_colors(userid))
+        except ValueError:
+            abort(404)
+    else:
+        return send_from_directory(datadir.get_user_songs_path(userid), str(songid) + ".mp3")
+
+@bp.get("/songs")
+def view_songs():
+    tag = request.args.get("tag", None)
+    user = request.args.get("user", None)
+
+    page_colors = colors.DEFAULT_COLORS
+    if user:
+        page_colors = users.get_user_colors(user)
+
+    if tag and user:
+        page_songs = Song.get_all_for_username_and_tag(user, tag)
+    elif tag:
+        page_songs = Song.get_all_for_tag(tag)
+    elif user:
+        page_songs = Song.get_all_for_username(user)
+    else:
+        page_songs = Song.get_random(50)
+
+    return render_template("songs-by-tag.html", user=user, tag=tag, songs=page_songs, **page_colors)
+