From 6129f06edf6651d111f275bd42d4554e218b49a4 Mon Sep 17 00:00:00 2001 From: Chris Fulljames Date: Sat, 5 Apr 2025 15:00:46 -0400 Subject: [PATCH] Refactor songs --- src/littlesongplace/__init__.py | 314 +----------------------------- src/littlesongplace/songs.py | 325 +++++++++++++++++++++++++++++++- 2 files changed, 325 insertions(+), 314 deletions(-) diff --git a/src/littlesongplace/__init__.py b/src/littlesongplace/__init__.py index f49b983..94b4021 100644 --- a/src/littlesongplace/__init__.py +++ b/src/littlesongplace/__init__.py @@ -41,8 +41,9 @@ app = Flask(__name__) app.secret_key = os.environ["SECRET_KEY"] if "SECRET_KEY" in os.environ else "dev" app.config["MAX_CONTENT_LENGTH"] = 1 * 1024 * 1024 * 1024 app.register_blueprint(auth.bp) -app.register_blueprint(profiles.bp) app.register_blueprint(comments.bp) +app.register_blueprint(profiles.bp) +app.register_blueprint(songs.bp) db.init_app(app) if "DATA_DIR" in os.environ: @@ -73,317 +74,6 @@ def index(): page_songs = songs.Song.get_latest(50) return render_template("index.html", users=all_users, songs=page_songs, page_title=title) -@app.get("/edit-song") -def edit_song(): - if not "userid" in session: - return redirect("/login") # Must be logged in to edit - - song = None - - colors = users.get_user_colors(session["userid"]) - - if "songid" in request.args: - try: - songid = int(request.args["songid"]) - except ValueError: - # Invalid song id - file not found - app.logger.warning(f"Failed song edit - {session['username']} - invalid song ID {request.args['songid']}") - abort(404) - - try: - song = songs.Song.by_id(songid) - if not song.userid == session["userid"]: - # Can't edit someone else's song - 401 unauthorized - app.logger.warning(f"Failed song edit - {session['username']} - attempted update for unowned song") - abort(401) - except ValueError: - # Song doesn't exist - 404 file not found - app.logger.warning(f"Failed song edit - {session['username']} - song doesn't exist ({songid})") - abort(404) - - return render_template("edit-song.html", song=song, **colors) - -@app.post("/upload-song") -def upload_song(): - if not "userid" in session: - return redirect("/login") # Must be logged in to edit - - userid = session["userid"] - - error = validate_song_form() - - if not error: - if "songid" in request.args: - error = update_song() - else: - error = create_song() - - if not error: - username = session["username"] - app.logger.info(f"{username} uploaded/modified a song") - if "songid" in request.args: - # After editing an existing song, go back to song page - return redirect(f"/song/{userid}/{request.args['songid']}?action=view") - else: - # After creating a new song, go back to profile - return redirect(f"/users/{username}") - - else: - username = session["username"] - app.logger.info(f"Failed song update - {username}") - return redirect(request.referrer) - -def validate_song_form(): - title = request.form["title"] - description = request.form["description"] - - error = False - - # Check if title is valid - if not title.isprintable(): - flash_and_log(f"'{title}' is not a valid song title", "error") - error = True - elif len(title) > 80: - flash_and_log(f"Title cannot be more than 80 characters", "error") - error = True - - # Check if description is valid - if len(description) > 10_000: - flash_and_log(f"Description cannot be more than 10k characters", "error") - error = True - - # Check if tags are valid - tags = request.form["tags"] - tags = [t.strip() for t in tags.split(",")] - for tag in tags: - if not tag.isprintable() or len(tag) > 30: - flash_and_log(f"'{tag}' is not a valid tag name", "error") - error = True - - # Check if collaborators are valid - collaborators = request.form["collabs"] - collaborators = [c.strip() for c in collaborators.split(",")] - for collab in collaborators: - if not collab.isprintable() or len(collab) > 31: # 30ch username + @ - flash_and_log(f"'{collab}' is not a valid collaborator name", "error") - error = True - - return error - -def update_song(): - songid = request.args["songid"] - try: - int(songid) - except ValueError: - abort(400) - - file = request.files["song-file"] if "song-file" in request.files else None - yt_url = request.form["song-url"] if "song-url" in request.form else None - title = request.form["title"] - description = request.form["description"] - tags = [t.strip() for t in request.form["tags"].split(",") if t] - collaborators = [c.strip() for c in request.form["collabs"].split(",") if c] - - # Make sure song exists and the logged-in user owns it - song_data = db.query("select * from songs where songid = ?", [songid], one=True) - if song_data is None: - abort(400) - elif session["userid"] != song_data["userid"]: - abort(401) - - error = False - if file or yt_url: - with tempfile.NamedTemporaryFile(delete=False) as tmp_file: - passed = convert_song(tmp_file, file, yt_url) - - if passed: - # Move file to permanent location - filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3") - shutil.move(tmp_file.name, filepath) - else: - error = True - - if not error: - # Update songs table - db.query( - "update songs set title = ?, description = ? where songid = ?", - [title, description, songid]) - - # Update song_tags table - db.query("delete from song_tags where songid = ?", [songid]) - for tag in tags: - db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid]) - - # Update song_collaborators table - db.query("delete from song_collaborators where songid = ?", [songid]) - for collab in collaborators: - db.query("insert into song_collaborators (name, songid) values (?, ?)", [collab, songid]) - - db.commit() - flash_and_log(f"Successfully updated '{title}'", "success") - - return error - -def create_song(): - file = request.files["song-file"] if "song-file" in request.files else None - yt_url = request.form["song-url"] if "song-url" in request.form else None - title = request.form["title"] - description = request.form["description"] - tags = [t.strip() for t in request.form["tags"].split(",") if t] - collaborators = [c.strip() for c in request.form["collabs"].split(",") if c] - - with tempfile.NamedTemporaryFile(delete=False) as tmp_file: - passed = convert_song(tmp_file, file, yt_url) - - if not passed: - return True - else: - # Create comment thread - threadid = comments.create_thread(comments.ThreadType.SONG, session["userid"]) - # Create song - timestamp = datetime.now(timezone.utc).isoformat() - song_data = db.query( - "insert into songs (userid, title, description, created, threadid) values (?, ?, ?, ?, ?) returning (songid)", - [session["userid"], title, description, timestamp, threadid], one=True) - songid = song_data["songid"] - filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3") - - # Move file to permanent location - shutil.move(tmp_file.name, filepath) - - # Assign tags - for tag in tags: - db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid]) - - # Assign collaborators - for collab in collaborators: - db.query("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab]) - - db.commit() - - flash_and_log(f"Successfully uploaded '{title}'", "success") - return False - -def convert_song(tmp_file, request_file, yt_url): - if request_file: - # Get uploaded file - request_file.save(tmp_file) - tmp_file.close() - else: - # Import from YouTube - tmp_file.close() - os.unlink(tmp_file.name) # Delete file so yt-dlp doesn't complain - try: - yt_import(tmp_file, yt_url) - except DownloadError as ex: - app.logger.warning(str(ex)) - flash_and_log(f"Failed to import from YouTube URL: {yt_url}") - return False - - result = subprocess.run(["mpck", tmp_file.name], stdout=subprocess.PIPE) - res_stdout = result.stdout.decode() - app.logger.info(f"mpck result: \n {res_stdout}") - lines = res_stdout.split("\n") - lines = [l.strip().lower() for l in lines] - if any(l.startswith("result") and l.endswith("ok") for l in lines): - # Uploaded valid mp3 file - return True - - # Not a valid mp3, try to convert with ffmpeg - with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as out_file: - out_file.close() - os.remove(out_file.name) - result = subprocess.run(["ffmpeg", "-i", tmp_file.name, out_file.name], stdout=subprocess.PIPE) - if result.returncode == 0: - # Successfully converted file, overwrite original file - os.replace(out_file.name, tmp_file.name) - return True - - if os.path.exists(out_file.name): - os.remove(out_file.name) - - flash_and_log("Invalid audio file", "error") - return False - -def yt_import(tmp_file, yt_url): - ydl_opts = { - 'format': 'm4a/bestaudio/best', - 'outtmpl': tmp_file.name, - 'logger': app.logger, - } - with YoutubeDL(ydl_opts) as ydl: - ydl.download([yt_url]) - -@app.get("/delete-song/") -def delete_song(songid): - - song_data = db.query("select * from songs where songid = ?", [songid], one=True) - - if not song_data: - app.logger.warning(f"Failed song delete - {session['username']} - song doesn't exist") - abort(404) # Song doesn't exist - - # Users can only delete their own songs - if song_data["userid"] != session["userid"]: - app.logger.warning(f"Failed song delete - {session['username']} - user doesn't own song") - abort(401) - - # Delete tags, collaborators - db.query("delete from song_tags where songid = ?", [songid]) - db.query("delete from song_collaborators where songid = ?", [songid]) - - # Delete song database entry - db.query("delete from songs where songid = ?", [songid]) - db.commit() - - # Delete song file from disk - songpath = datadir.get_user_songs_path(session["userid"]) / (str(songid) + ".mp3") - if songpath.exists(): - os.remove(songpath) - - app.logger.info(f"{session['username']} deleted song: {song_data['title']}") - flash_and_log(f"Deleted '{song_data['title']}'", "success") - - return redirect(f"/users/{session['username']}") - -@app.get("/song//") -def song(userid, songid): - if request.args.get("action", None) == "view": - try: - song = songs.Song.by_id(songid) - if song.userid != userid: - abort(404) - - return render_template( - "song.html", - songs=[song], - song=song, - **users.get_user_colors(userid)) - except ValueError: - abort(404) - else: - return send_from_directory(datadir.get_user_songs_path(userid), str(songid) + ".mp3") - -@app.get("/songs") -def view_songs(): - tag = request.args.get("tag", None) - user = request.args.get("user", None) - - page_colors = colors.DEFAULT_COLORS - if user: - page_colors = users.get_user_colors(user) - - if tag and user: - page_songs = songs.Song.get_all_for_username_and_tag(user, tag) - elif tag: - page_songs = songs.Song.get_all_for_tag(tag) - elif user: - page_songs = songs.Song.get_all_for_username(user) - else: - page_songs = songs.Song.get_random(50) - - return render_template("songs-by-tag.html", user=user, tag=tag, songs=page_songs, **page_colors) - @app.get("/activity") def activity(): if not "userid" in session: diff --git a/src/littlesongplace/songs.py b/src/littlesongplace/songs.py index 471da51..9637742 100644 --- a/src/littlesongplace/songs.py +++ b/src/littlesongplace/songs.py @@ -1,10 +1,20 @@ import json +import os import random -from datetime import datetime +import shutil +import subprocess +import tempfile +from datetime import datetime, timezone from dataclasses import dataclass -from . import comments, db, users +from flask import Blueprint, current_app, render_template, request, redirect, \ + session, abort, send_from_directory + +from . import comments, colors, datadir, db, users from .sanitize import sanitize_user_text +from .logutils import flash_and_log + +bp = Blueprint("songs", __name__) @dataclass class Song: @@ -110,3 +120,314 @@ class Song: collabs[songid] = db.query("select (name) from song_collaborators where songid = ?", [songid]) return tags, collabs +@bp.get("/edit-song") +def edit_song(): + if not "userid" in session: + return redirect("/login") # Must be logged in to edit + + song = None + + song_colors = users.get_user_colors(session["userid"]) + + if "songid" in request.args: + try: + songid = int(request.args["songid"]) + except ValueError: + # Invalid song id - file not found + current_app.logger.warning(f"Failed song edit - {session['username']} - invalid song ID {request.args['songid']}") + abort(404) + + try: + song = Song.by_id(songid) + if not song.userid == session["userid"]: + # Can't edit someone else's song - 401 unauthorized + current_app.logger.warning(f"Failed song edit - {session['username']} - attempted update for unowned song") + abort(401) + except ValueError: + # Song doesn't exist - 404 file not found + current_app.logger.warning(f"Failed song edit - {session['username']} - song doesn't exist ({songid})") + abort(404) + + return render_template("edit-song.html", song=song, **song_colors) + +@bp.post("/upload-song") +def upload_song(): + if not "userid" in session: + return redirect("/login") # Must be logged in to edit + + userid = session["userid"] + + error = validate_song_form() + + if not error: + if "songid" in request.args: + error = update_song() + else: + error = create_song() + + if not error: + username = session["username"] + current_app.logger.info(f"{username} uploaded/modified a song") + if "songid" in request.args: + # After editing an existing song, go back to song page + return redirect(f"/song/{userid}/{request.args['songid']}?action=view") + else: + # After creating a new song, go back to profile + return redirect(f"/users/{username}") + + else: + username = session["username"] + current_app.logger.info(f"Failed song update - {username}") + return redirect(request.referrer) + +def validate_song_form(): + title = request.form["title"] + description = request.form["description"] + + error = False + + # Check if title is valid + if not title.isprintable(): + flash_and_log(f"'{title}' is not a valid song title", "error") + error = True + elif len(title) > 80: + flash_and_log(f"Title cannot be more than 80 characters", "error") + error = True + + # Check if description is valid + if len(description) > 10_000: + flash_and_log(f"Description cannot be more than 10k characters", "error") + error = True + + # Check if tags are valid + tags = request.form["tags"] + tags = [t.strip() for t in tags.split(",")] + for tag in tags: + if not tag.isprintable() or len(tag) > 30: + flash_and_log(f"'{tag}' is not a valid tag name", "error") + error = True + + # Check if collaborators are valid + collaborators = request.form["collabs"] + collaborators = [c.strip() for c in collaborators.split(",")] + for collab in collaborators: + if not collab.isprintable() or len(collab) > 31: # 30ch username + @ + flash_and_log(f"'{collab}' is not a valid collaborator name", "error") + error = True + + return error + +def update_song(): + songid = request.args["songid"] + try: + int(songid) + except ValueError: + abort(400) + + file = request.files["song-file"] if "song-file" in request.files else None + yt_url = request.form["song-url"] if "song-url" in request.form else None + title = request.form["title"] + description = request.form["description"] + tags = [t.strip() for t in request.form["tags"].split(",") if t] + collaborators = [c.strip() for c in request.form["collabs"].split(",") if c] + + # Make sure song exists and the logged-in user owns it + song_data = db.query("select * from songs where songid = ?", [songid], one=True) + if song_data is None: + abort(400) + elif session["userid"] != song_data["userid"]: + abort(401) + + error = False + if file or yt_url: + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + passed = convert_song(tmp_file, file, yt_url) + + if passed: + # Move file to permanent location + filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3") + shutil.move(tmp_file.name, filepath) + else: + error = True + + if not error: + # Update songs table + db.query( + "update songs set title = ?, description = ? where songid = ?", + [title, description, songid]) + + # Update song_tags table + db.query("delete from song_tags where songid = ?", [songid]) + for tag in tags: + db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid]) + + # Update song_collaborators table + db.query("delete from song_collaborators where songid = ?", [songid]) + for collab in collaborators: + db.query("insert into song_collaborators (name, songid) values (?, ?)", [collab, songid]) + + db.commit() + flash_and_log(f"Successfully updated '{title}'", "success") + + return error + +def create_song(): + file = request.files["song-file"] if "song-file" in request.files else None + yt_url = request.form["song-url"] if "song-url" in request.form else None + title = request.form["title"] + description = request.form["description"] + tags = [t.strip() for t in request.form["tags"].split(",") if t] + collaborators = [c.strip() for c in request.form["collabs"].split(",") if c] + + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + passed = convert_song(tmp_file, file, yt_url) + + if not passed: + return True + else: + # Create comment thread + threadid = comments.create_thread(comments.ThreadType.SONG, session["userid"]) + # Create song + timestamp = datetime.now(timezone.utc).isoformat() + song_data = db.query( + "insert into songs (userid, title, description, created, threadid) values (?, ?, ?, ?, ?) returning (songid)", + [session["userid"], title, description, timestamp, threadid], one=True) + songid = song_data["songid"] + filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3") + + # Move file to permanent location + shutil.move(tmp_file.name, filepath) + + # Assign tags + for tag in tags: + db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid]) + + # Assign collaborators + for collab in collaborators: + db.query("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab]) + + db.commit() + + flash_and_log(f"Successfully uploaded '{title}'", "success") + return False + +def convert_song(tmp_file, request_file, yt_url): + if request_file: + # Get uploaded file + request_file.save(tmp_file) + tmp_file.close() + else: + # Import from YouTube + tmp_file.close() + os.unlink(tmp_file.name) # Delete file so yt-dlp doesn't complain + try: + yt_import(tmp_file, yt_url) + except DownloadError as ex: + current_app.logger.warning(str(ex)) + flash_and_log(f"Failed to import from YouTube URL: {yt_url}") + return False + + result = subprocess.run(["mpck", tmp_file.name], stdout=subprocess.PIPE) + res_stdout = result.stdout.decode() + current_app.logger.info(f"mpck result: \n {res_stdout}") + lines = res_stdout.split("\n") + lines = [l.strip().lower() for l in lines] + if any(l.startswith("result") and l.endswith("ok") for l in lines): + # Uploaded valid mp3 file + return True + + # Not a valid mp3, try to convert with ffmpeg + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as out_file: + out_file.close() + os.remove(out_file.name) + result = subprocess.run(["ffmpeg", "-i", tmp_file.name, out_file.name], stdout=subprocess.PIPE) + if result.returncode == 0: + # Successfully converted file, overwrite original file + os.replace(out_file.name, tmp_file.name) + return True + + if os.path.exists(out_file.name): + os.remove(out_file.name) + + flash_and_log("Invalid audio file", "error") + return False + +def yt_import(tmp_file, yt_url): + ydl_opts = { + 'format': 'm4a/bestaudio/best', + 'outtmpl': tmp_file.name, + 'logger': current_app.logger, + } + with YoutubeDL(ydl_opts) as ydl: + ydl.download([yt_url]) + +@bp.get("/delete-song/") +def delete_song(songid): + + song_data = db.query("select * from songs where songid = ?", [songid], one=True) + + if not song_data: + current_app.logger.warning(f"Failed song delete - {session['username']} - song doesn't exist") + abort(404) # Song doesn't exist + + # Users can only delete their own songs + if song_data["userid"] != session["userid"]: + current_app.logger.warning(f"Failed song delete - {session['username']} - user doesn't own song") + abort(401) + + # Delete tags, collaborators + db.query("delete from song_tags where songid = ?", [songid]) + db.query("delete from song_collaborators where songid = ?", [songid]) + + # Delete song database entry + db.query("delete from songs where songid = ?", [songid]) + db.commit() + + # Delete song file from disk + songpath = datadir.get_user_songs_path(session["userid"]) / (str(songid) + ".mp3") + if songpath.exists(): + os.remove(songpath) + + current_app.logger.info(f"{session['username']} deleted song: {song_data['title']}") + flash_and_log(f"Deleted '{song_data['title']}'", "success") + + return redirect(f"/users/{session['username']}") + +@bp.get("/song//") +def song(userid, songid): + if request.args.get("action", None) == "view": + try: + song = Song.by_id(songid) + if song.userid != userid: + abort(404) + + return render_template( + "song.html", + songs=[song], + song=song, + **users.get_user_colors(userid)) + except ValueError: + abort(404) + else: + return send_from_directory(datadir.get_user_songs_path(userid), str(songid) + ".mp3") + +@bp.get("/songs") +def view_songs(): + tag = request.args.get("tag", None) + user = request.args.get("user", None) + + page_colors = colors.DEFAULT_COLORS + if user: + page_colors = users.get_user_colors(user) + + if tag and user: + page_songs = Song.get_all_for_username_and_tag(user, tag) + elif tag: + page_songs = Song.get_all_for_tag(tag) + elif user: + page_songs = Song.get_all_for_username(user) + else: + page_songs = Song.get_random(50) + + return render_template("songs-by-tag.html", user=user, tag=tag, songs=page_songs, **page_colors) + -- 2.39.5