app.secret_key = os.environ["SECRET_KEY"] if "SECRET_KEY" in os.environ else "dev"
app.config["MAX_CONTENT_LENGTH"] = 1 * 1024 * 1024 * 1024
app.register_blueprint(auth.bp)
-app.register_blueprint(profiles.bp)
app.register_blueprint(comments.bp)
+app.register_blueprint(profiles.bp)
+app.register_blueprint(songs.bp)
db.init_app(app)
if "DATA_DIR" in os.environ:
page_songs = songs.Song.get_latest(50)
return render_template("index.html", users=all_users, songs=page_songs, page_title=title)
-@app.get("/edit-song")
-def edit_song():
- if not "userid" in session:
- return redirect("/login") # Must be logged in to edit
-
- song = None
-
- colors = users.get_user_colors(session["userid"])
-
- if "songid" in request.args:
- try:
- songid = int(request.args["songid"])
- except ValueError:
- # Invalid song id - file not found
- app.logger.warning(f"Failed song edit - {session['username']} - invalid song ID {request.args['songid']}")
- abort(404)
-
- try:
- song = songs.Song.by_id(songid)
- if not song.userid == session["userid"]:
- # Can't edit someone else's song - 401 unauthorized
- app.logger.warning(f"Failed song edit - {session['username']} - attempted update for unowned song")
- abort(401)
- except ValueError:
- # Song doesn't exist - 404 file not found
- app.logger.warning(f"Failed song edit - {session['username']} - song doesn't exist ({songid})")
- abort(404)
-
- return render_template("edit-song.html", song=song, **colors)
-
-@app.post("/upload-song")
-def upload_song():
- if not "userid" in session:
- return redirect("/login") # Must be logged in to edit
-
- userid = session["userid"]
-
- error = validate_song_form()
-
- if not error:
- if "songid" in request.args:
- error = update_song()
- else:
- error = create_song()
-
- if not error:
- username = session["username"]
- app.logger.info(f"{username} uploaded/modified a song")
- if "songid" in request.args:
- # After editing an existing song, go back to song page
- return redirect(f"/song/{userid}/{request.args['songid']}?action=view")
- else:
- # After creating a new song, go back to profile
- return redirect(f"/users/{username}")
-
- else:
- username = session["username"]
- app.logger.info(f"Failed song update - {username}")
- return redirect(request.referrer)
-
-def validate_song_form():
- title = request.form["title"]
- description = request.form["description"]
-
- error = False
-
- # Check if title is valid
- if not title.isprintable():
- flash_and_log(f"'{title}' is not a valid song title", "error")
- error = True
- elif len(title) > 80:
- flash_and_log(f"Title cannot be more than 80 characters", "error")
- error = True
-
- # Check if description is valid
- if len(description) > 10_000:
- flash_and_log(f"Description cannot be more than 10k characters", "error")
- error = True
-
- # Check if tags are valid
- tags = request.form["tags"]
- tags = [t.strip() for t in tags.split(",")]
- for tag in tags:
- if not tag.isprintable() or len(tag) > 30:
- flash_and_log(f"'{tag}' is not a valid tag name", "error")
- error = True
-
- # Check if collaborators are valid
- collaborators = request.form["collabs"]
- collaborators = [c.strip() for c in collaborators.split(",")]
- for collab in collaborators:
- if not collab.isprintable() or len(collab) > 31: # 30ch username + @
- flash_and_log(f"'{collab}' is not a valid collaborator name", "error")
- error = True
-
- return error
-
-def update_song():
- songid = request.args["songid"]
- try:
- int(songid)
- except ValueError:
- abort(400)
-
- file = request.files["song-file"] if "song-file" in request.files else None
- yt_url = request.form["song-url"] if "song-url" in request.form else None
- title = request.form["title"]
- description = request.form["description"]
- tags = [t.strip() for t in request.form["tags"].split(",") if t]
- collaborators = [c.strip() for c in request.form["collabs"].split(",") if c]
-
- # Make sure song exists and the logged-in user owns it
- song_data = db.query("select * from songs where songid = ?", [songid], one=True)
- if song_data is None:
- abort(400)
- elif session["userid"] != song_data["userid"]:
- abort(401)
-
- error = False
- if file or yt_url:
- with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
- passed = convert_song(tmp_file, file, yt_url)
-
- if passed:
- # Move file to permanent location
- filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
- shutil.move(tmp_file.name, filepath)
- else:
- error = True
-
- if not error:
- # Update songs table
- db.query(
- "update songs set title = ?, description = ? where songid = ?",
- [title, description, songid])
-
- # Update song_tags table
- db.query("delete from song_tags where songid = ?", [songid])
- for tag in tags:
- db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
-
- # Update song_collaborators table
- db.query("delete from song_collaborators where songid = ?", [songid])
- for collab in collaborators:
- db.query("insert into song_collaborators (name, songid) values (?, ?)", [collab, songid])
-
- db.commit()
- flash_and_log(f"Successfully updated '{title}'", "success")
-
- return error
-
-def create_song():
- file = request.files["song-file"] if "song-file" in request.files else None
- yt_url = request.form["song-url"] if "song-url" in request.form else None
- title = request.form["title"]
- description = request.form["description"]
- tags = [t.strip() for t in request.form["tags"].split(",") if t]
- collaborators = [c.strip() for c in request.form["collabs"].split(",") if c]
-
- with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
- passed = convert_song(tmp_file, file, yt_url)
-
- if not passed:
- return True
- else:
- # Create comment thread
- threadid = comments.create_thread(comments.ThreadType.SONG, session["userid"])
- # Create song
- timestamp = datetime.now(timezone.utc).isoformat()
- song_data = db.query(
- "insert into songs (userid, title, description, created, threadid) values (?, ?, ?, ?, ?) returning (songid)",
- [session["userid"], title, description, timestamp, threadid], one=True)
- songid = song_data["songid"]
- filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
-
- # Move file to permanent location
- shutil.move(tmp_file.name, filepath)
-
- # Assign tags
- for tag in tags:
- db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
-
- # Assign collaborators
- for collab in collaborators:
- db.query("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab])
-
- db.commit()
-
- flash_and_log(f"Successfully uploaded '{title}'", "success")
- return False
-
-def convert_song(tmp_file, request_file, yt_url):
- if request_file:
- # Get uploaded file
- request_file.save(tmp_file)
- tmp_file.close()
- else:
- # Import from YouTube
- tmp_file.close()
- os.unlink(tmp_file.name) # Delete file so yt-dlp doesn't complain
- try:
- yt_import(tmp_file, yt_url)
- except DownloadError as ex:
- app.logger.warning(str(ex))
- flash_and_log(f"Failed to import from YouTube URL: {yt_url}")
- return False
-
- result = subprocess.run(["mpck", tmp_file.name], stdout=subprocess.PIPE)
- res_stdout = result.stdout.decode()
- app.logger.info(f"mpck result: \n {res_stdout}")
- lines = res_stdout.split("\n")
- lines = [l.strip().lower() for l in lines]
- if any(l.startswith("result") and l.endswith("ok") for l in lines):
- # Uploaded valid mp3 file
- return True
-
- # Not a valid mp3, try to convert with ffmpeg
- with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as out_file:
- out_file.close()
- os.remove(out_file.name)
- result = subprocess.run(["ffmpeg", "-i", tmp_file.name, out_file.name], stdout=subprocess.PIPE)
- if result.returncode == 0:
- # Successfully converted file, overwrite original file
- os.replace(out_file.name, tmp_file.name)
- return True
-
- if os.path.exists(out_file.name):
- os.remove(out_file.name)
-
- flash_and_log("Invalid audio file", "error")
- return False
-
-def yt_import(tmp_file, yt_url):
- ydl_opts = {
- 'format': 'm4a/bestaudio/best',
- 'outtmpl': tmp_file.name,
- 'logger': app.logger,
- }
- with YoutubeDL(ydl_opts) as ydl:
- ydl.download([yt_url])
-
-@app.get("/delete-song/<int:songid>")
-def delete_song(songid):
-
- song_data = db.query("select * from songs where songid = ?", [songid], one=True)
-
- if not song_data:
- app.logger.warning(f"Failed song delete - {session['username']} - song doesn't exist")
- abort(404) # Song doesn't exist
-
- # Users can only delete their own songs
- if song_data["userid"] != session["userid"]:
- app.logger.warning(f"Failed song delete - {session['username']} - user doesn't own song")
- abort(401)
-
- # Delete tags, collaborators
- db.query("delete from song_tags where songid = ?", [songid])
- db.query("delete from song_collaborators where songid = ?", [songid])
-
- # Delete song database entry
- db.query("delete from songs where songid = ?", [songid])
- db.commit()
-
- # Delete song file from disk
- songpath = datadir.get_user_songs_path(session["userid"]) / (str(songid) + ".mp3")
- if songpath.exists():
- os.remove(songpath)
-
- app.logger.info(f"{session['username']} deleted song: {song_data['title']}")
- flash_and_log(f"Deleted '{song_data['title']}'", "success")
-
- return redirect(f"/users/{session['username']}")
-
-@app.get("/song/<int:userid>/<int:songid>")
-def song(userid, songid):
- if request.args.get("action", None) == "view":
- try:
- song = songs.Song.by_id(songid)
- if song.userid != userid:
- abort(404)
-
- return render_template(
- "song.html",
- songs=[song],
- song=song,
- **users.get_user_colors(userid))
- except ValueError:
- abort(404)
- else:
- return send_from_directory(datadir.get_user_songs_path(userid), str(songid) + ".mp3")
-
-@app.get("/songs")
-def view_songs():
- tag = request.args.get("tag", None)
- user = request.args.get("user", None)
-
- page_colors = colors.DEFAULT_COLORS
- if user:
- page_colors = users.get_user_colors(user)
-
- if tag and user:
- page_songs = songs.Song.get_all_for_username_and_tag(user, tag)
- elif tag:
- page_songs = songs.Song.get_all_for_tag(tag)
- elif user:
- page_songs = songs.Song.get_all_for_username(user)
- else:
- page_songs = songs.Song.get_random(50)
-
- return render_template("songs-by-tag.html", user=user, tag=tag, songs=page_songs, **page_colors)
-
@app.get("/activity")
def activity():
if not "userid" in session:
import json
+import os
import random
-from datetime import datetime
+import shutil
+import subprocess
+import tempfile
+from datetime import datetime, timezone
from dataclasses import dataclass
-from . import comments, db, users
+from flask import Blueprint, current_app, render_template, request, redirect, \
+ session, abort, send_from_directory
+
+from . import comments, colors, datadir, db, users
from .sanitize import sanitize_user_text
+from .logutils import flash_and_log
+
+bp = Blueprint("songs", __name__)
@dataclass
class Song:
collabs[songid] = db.query("select (name) from song_collaborators where songid = ?", [songid])
return tags, collabs
+@bp.get("/edit-song")
+def edit_song():
+ if not "userid" in session:
+ return redirect("/login") # Must be logged in to edit
+
+ song = None
+
+ song_colors = users.get_user_colors(session["userid"])
+
+ if "songid" in request.args:
+ try:
+ songid = int(request.args["songid"])
+ except ValueError:
+ # Invalid song id - file not found
+ current_app.logger.warning(f"Failed song edit - {session['username']} - invalid song ID {request.args['songid']}")
+ abort(404)
+
+ try:
+ song = Song.by_id(songid)
+ if not song.userid == session["userid"]:
+ # Can't edit someone else's song - 401 unauthorized
+ current_app.logger.warning(f"Failed song edit - {session['username']} - attempted update for unowned song")
+ abort(401)
+ except ValueError:
+ # Song doesn't exist - 404 file not found
+ current_app.logger.warning(f"Failed song edit - {session['username']} - song doesn't exist ({songid})")
+ abort(404)
+
+ return render_template("edit-song.html", song=song, **song_colors)
+
+@bp.post("/upload-song")
+def upload_song():
+ if not "userid" in session:
+ return redirect("/login") # Must be logged in to edit
+
+ userid = session["userid"]
+
+ error = validate_song_form()
+
+ if not error:
+ if "songid" in request.args:
+ error = update_song()
+ else:
+ error = create_song()
+
+ if not error:
+ username = session["username"]
+ current_app.logger.info(f"{username} uploaded/modified a song")
+ if "songid" in request.args:
+ # After editing an existing song, go back to song page
+ return redirect(f"/song/{userid}/{request.args['songid']}?action=view")
+ else:
+ # After creating a new song, go back to profile
+ return redirect(f"/users/{username}")
+
+ else:
+ username = session["username"]
+ current_app.logger.info(f"Failed song update - {username}")
+ return redirect(request.referrer)
+
+def validate_song_form():
+ title = request.form["title"]
+ description = request.form["description"]
+
+ error = False
+
+ # Check if title is valid
+ if not title.isprintable():
+ flash_and_log(f"'{title}' is not a valid song title", "error")
+ error = True
+ elif len(title) > 80:
+ flash_and_log(f"Title cannot be more than 80 characters", "error")
+ error = True
+
+ # Check if description is valid
+ if len(description) > 10_000:
+ flash_and_log(f"Description cannot be more than 10k characters", "error")
+ error = True
+
+ # Check if tags are valid
+ tags = request.form["tags"]
+ tags = [t.strip() for t in tags.split(",")]
+ for tag in tags:
+ if not tag.isprintable() or len(tag) > 30:
+ flash_and_log(f"'{tag}' is not a valid tag name", "error")
+ error = True
+
+ # Check if collaborators are valid
+ collaborators = request.form["collabs"]
+ collaborators = [c.strip() for c in collaborators.split(",")]
+ for collab in collaborators:
+ if not collab.isprintable() or len(collab) > 31: # 30ch username + @
+ flash_and_log(f"'{collab}' is not a valid collaborator name", "error")
+ error = True
+
+ return error
+
+def update_song():
+ songid = request.args["songid"]
+ try:
+ int(songid)
+ except ValueError:
+ abort(400)
+
+ file = request.files["song-file"] if "song-file" in request.files else None
+ yt_url = request.form["song-url"] if "song-url" in request.form else None
+ title = request.form["title"]
+ description = request.form["description"]
+ tags = [t.strip() for t in request.form["tags"].split(",") if t]
+ collaborators = [c.strip() for c in request.form["collabs"].split(",") if c]
+
+ # Make sure song exists and the logged-in user owns it
+ song_data = db.query("select * from songs where songid = ?", [songid], one=True)
+ if song_data is None:
+ abort(400)
+ elif session["userid"] != song_data["userid"]:
+ abort(401)
+
+ error = False
+ if file or yt_url:
+ with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
+ passed = convert_song(tmp_file, file, yt_url)
+
+ if passed:
+ # Move file to permanent location
+ filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
+ shutil.move(tmp_file.name, filepath)
+ else:
+ error = True
+
+ if not error:
+ # Update songs table
+ db.query(
+ "update songs set title = ?, description = ? where songid = ?",
+ [title, description, songid])
+
+ # Update song_tags table
+ db.query("delete from song_tags where songid = ?", [songid])
+ for tag in tags:
+ db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
+
+ # Update song_collaborators table
+ db.query("delete from song_collaborators where songid = ?", [songid])
+ for collab in collaborators:
+ db.query("insert into song_collaborators (name, songid) values (?, ?)", [collab, songid])
+
+ db.commit()
+ flash_and_log(f"Successfully updated '{title}'", "success")
+
+ return error
+
+def create_song():
+ file = request.files["song-file"] if "song-file" in request.files else None
+ yt_url = request.form["song-url"] if "song-url" in request.form else None
+ title = request.form["title"]
+ description = request.form["description"]
+ tags = [t.strip() for t in request.form["tags"].split(",") if t]
+ collaborators = [c.strip() for c in request.form["collabs"].split(",") if c]
+
+ with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
+ passed = convert_song(tmp_file, file, yt_url)
+
+ if not passed:
+ return True
+ else:
+ # Create comment thread
+ threadid = comments.create_thread(comments.ThreadType.SONG, session["userid"])
+ # Create song
+ timestamp = datetime.now(timezone.utc).isoformat()
+ song_data = db.query(
+ "insert into songs (userid, title, description, created, threadid) values (?, ?, ?, ?, ?) returning (songid)",
+ [session["userid"], title, description, timestamp, threadid], one=True)
+ songid = song_data["songid"]
+ filepath = datadir.get_user_songs_path(session["userid"]) / (str(song_data["songid"]) + ".mp3")
+
+ # Move file to permanent location
+ shutil.move(tmp_file.name, filepath)
+
+ # Assign tags
+ for tag in tags:
+ db.query("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
+
+ # Assign collaborators
+ for collab in collaborators:
+ db.query("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab])
+
+ db.commit()
+
+ flash_and_log(f"Successfully uploaded '{title}'", "success")
+ return False
+
+def convert_song(tmp_file, request_file, yt_url):
+ if request_file:
+ # Get uploaded file
+ request_file.save(tmp_file)
+ tmp_file.close()
+ else:
+ # Import from YouTube
+ tmp_file.close()
+ os.unlink(tmp_file.name) # Delete file so yt-dlp doesn't complain
+ try:
+ yt_import(tmp_file, yt_url)
+ except DownloadError as ex:
+ current_app.logger.warning(str(ex))
+ flash_and_log(f"Failed to import from YouTube URL: {yt_url}")
+ return False
+
+ result = subprocess.run(["mpck", tmp_file.name], stdout=subprocess.PIPE)
+ res_stdout = result.stdout.decode()
+ current_app.logger.info(f"mpck result: \n {res_stdout}")
+ lines = res_stdout.split("\n")
+ lines = [l.strip().lower() for l in lines]
+ if any(l.startswith("result") and l.endswith("ok") for l in lines):
+ # Uploaded valid mp3 file
+ return True
+
+ # Not a valid mp3, try to convert with ffmpeg
+ with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as out_file:
+ out_file.close()
+ os.remove(out_file.name)
+ result = subprocess.run(["ffmpeg", "-i", tmp_file.name, out_file.name], stdout=subprocess.PIPE)
+ if result.returncode == 0:
+ # Successfully converted file, overwrite original file
+ os.replace(out_file.name, tmp_file.name)
+ return True
+
+ if os.path.exists(out_file.name):
+ os.remove(out_file.name)
+
+ flash_and_log("Invalid audio file", "error")
+ return False
+
+def yt_import(tmp_file, yt_url):
+ ydl_opts = {
+ 'format': 'm4a/bestaudio/best',
+ 'outtmpl': tmp_file.name,
+ 'logger': current_app.logger,
+ }
+ with YoutubeDL(ydl_opts) as ydl:
+ ydl.download([yt_url])
+
+@bp.get("/delete-song/<int:songid>")
+def delete_song(songid):
+
+ song_data = db.query("select * from songs where songid = ?", [songid], one=True)
+
+ if not song_data:
+ current_app.logger.warning(f"Failed song delete - {session['username']} - song doesn't exist")
+ abort(404) # Song doesn't exist
+
+ # Users can only delete their own songs
+ if song_data["userid"] != session["userid"]:
+ current_app.logger.warning(f"Failed song delete - {session['username']} - user doesn't own song")
+ abort(401)
+
+ # Delete tags, collaborators
+ db.query("delete from song_tags where songid = ?", [songid])
+ db.query("delete from song_collaborators where songid = ?", [songid])
+
+ # Delete song database entry
+ db.query("delete from songs where songid = ?", [songid])
+ db.commit()
+
+ # Delete song file from disk
+ songpath = datadir.get_user_songs_path(session["userid"]) / (str(songid) + ".mp3")
+ if songpath.exists():
+ os.remove(songpath)
+
+ current_app.logger.info(f"{session['username']} deleted song: {song_data['title']}")
+ flash_and_log(f"Deleted '{song_data['title']}'", "success")
+
+ return redirect(f"/users/{session['username']}")
+
+@bp.get("/song/<int:userid>/<int:songid>")
+def song(userid, songid):
+ if request.args.get("action", None) == "view":
+ try:
+ song = Song.by_id(songid)
+ if song.userid != userid:
+ abort(404)
+
+ return render_template(
+ "song.html",
+ songs=[song],
+ song=song,
+ **users.get_user_colors(userid))
+ except ValueError:
+ abort(404)
+ else:
+ return send_from_directory(datadir.get_user_songs_path(userid), str(songid) + ".mp3")
+
+@bp.get("/songs")
+def view_songs():
+ tag = request.args.get("tag", None)
+ user = request.args.get("user", None)
+
+ page_colors = colors.DEFAULT_COLORS
+ if user:
+ page_colors = users.get_user_colors(user)
+
+ if tag and user:
+ page_songs = Song.get_all_for_username_and_tag(user, tag)
+ elif tag:
+ page_songs = Song.get_all_for_tag(tag)
+ elif user:
+ page_songs = Song.get_all_for_username(user)
+ else:
+ page_songs = Song.get_random(50)
+
+ return render_template("songs-by-tag.html", user=user, tag=tag, songs=page_songs, **page_colors)
+