From: Chris Fulljames Date: Sat, 5 Apr 2025 18:39:52 +0000 (-0400) Subject: More refactoring X-Git-Url: https://littlesong.place/gitweb/?a=commitdiff_plain;h=58331ecd1e5c75759ba753170f3dcdef0cc0542f;p=littlesongplace.git More refactoring --- diff --git a/src/littlesongplace/__init__.py b/src/littlesongplace/__init__.py index 5c3ff73..f49b983 100644 --- a/src/littlesongplace/__init__.py +++ b/src/littlesongplace/__init__.py @@ -1,38 +1,26 @@ import base64 import enum -import json import logging import os import random import shutil import subprocess -import sys import tempfile -import uuid -from dataclasses import dataclass from datetime import datetime, timezone from logging.handlers import RotatingFileHandler -from pathlib import Path, PosixPath -from typing import Optional +from pathlib import Path -import bleach import click -from bleach.css_sanitizer import CSSSanitizer from flask import Flask, render_template, request, redirect, g, session, abort, \ send_from_directory, flash, get_flashed_messages -from PIL import Image, UnidentifiedImageError from werkzeug.utils import secure_filename from werkzeug.middleware.proxy_fix import ProxyFix from yt_dlp import YoutubeDL from yt_dlp.utils import DownloadError -from . import auth, comments, datadir, db +from . import auth, colors, comments, datadir, db, profiles, songs, users from .logutils import flash_and_log - -BGCOLOR = "#e8e6b5" -FGCOLOR = "#695c73" -ACCOLOR = "#9373a9" -DEFAULT_COLORS = dict(bgcolor=BGCOLOR, fgcolor=FGCOLOR, accolor=ACCOLOR) +from .sanitize import sanitize_user_text ################################################################################ # Logging @@ -53,6 +41,8 @@ app = Flask(__name__) app.secret_key = os.environ["SECRET_KEY"] if "SECRET_KEY" in os.environ else "dev" app.config["MAX_CONTENT_LENGTH"] = 1 * 1024 * 1024 * 1024 app.register_blueprint(auth.bp) +app.register_blueprint(profiles.bp) +app.register_blueprint(comments.bp) db.init_app(app) if "DATA_DIR" in os.environ: @@ -64,11 +54,11 @@ if "DATA_DIR" in os.environ: @app.route("/") def index(): - users = db.query("select * from users order by username asc") - users = [dict(row) for row in users] - for user in users: - user["has_pfp"] = user_has_pfp(user["userid"]) - for key, value in get_user_colors(user).items(): + all_users = db.query("select * from users order by username asc") + all_users = [dict(row) for row in all_users] + for user in all_users: + user["has_pfp"] = users.user_has_pfp(user["userid"]) + for key, value in users.get_user_colors(user).items(): user[key] = value titles = [ @@ -80,100 +70,8 @@ def index(): titles, weights = zip(*titles) title = random.choices(titles, weights)[0] - songs = Song.get_latest(50) - return render_template("index.html", users=users, songs=songs, page_title=title) - -@app.get("/users/") -def users_profile(profile_username): - - # Look up user data for current profile - profile_data = db.query("select * from users where username = ?", [profile_username], one=True) - if profile_data is None: - abort(404) - profile_userid = profile_data["userid"] - - # Get playlists for current profile - userid = session.get("userid", None) - show_private = userid == profile_userid - if show_private: - plist_data = db.query("select * from playlists where userid = ? order by updated desc", [profile_userid]) - else: - plist_data = db.query("select * from playlists where userid = ? and private = 0 order by updated desc", [profile_userid]) - - # Get songs for current profile - songs = Song.get_all_for_userid(profile_userid) - - # Get comments for current profile - profile_comments = get_comments(profile_data["threadid"]) - - # Sanitize bio - profile_bio = "" - if profile_data["bio"] is not None: - profile_bio = sanitize_user_text(profile_data["bio"]) - - return render_template( - "profile.html", - name=profile_username, - userid=profile_userid, - bio=profile_bio, - **get_user_colors(profile_data), - playlists=plist_data, - songs=songs, - comments=profile_comments, - threadid=profile_data["threadid"], - user_has_pfp=user_has_pfp(profile_userid)) - -@app.post("/edit-profile") -def edit_profile(): - if not "userid" in session: - abort(401) - - db.query( - "update users set bio = ?, bgcolor = ?, fgcolor = ?, accolor = ? where userid = ?", - [request.form["bio"], request.form["bgcolor"], request.form["fgcolor"], request.form["accolor"], session["userid"]]) - db.commit() - - if request.files["pfp"]: - pfp_path = datadir.get_user_images_path(session["userid"]) / "pfp.jpg" - - try: - with Image.open(request.files["pfp"]) as im: - # Drop alpha channel - if im.mode in ("RGBA", "P"): - im = im.convert("RGB") - - target_size = 256 # Square (same width/height) - # Resize - if im.width >= im.height: - scale = 256 / im.height - else: - scale = 256 / im.width - - im = im.resize((round(im.width*scale), round(im.height*scale))) - - # Crop to square - center_h = im.width / 2 - center_v = im.height / 2 - left = center_h - (target_size // 2) - right = center_h + (target_size // 2) - top = center_v - (target_size // 2) - bottom = center_v + (target_size // 2) - im = im.crop((left, top, right, bottom)) - - # Save to permanent location - im.save(pfp_path) - except UnidentifiedImageError: - abort(400) # Invalid image - - flash("Profile updated successfully") - - app.logger.info(f"{session['username']} updated bio") - - return redirect(f"/users/{session['username']}") - -@app.get("/pfp/") -def pfp(userid): - return send_from_directory(datadir.get_user_images_path(userid), "pfp.jpg") + page_songs = songs.Song.get_latest(50) + return render_template("index.html", users=all_users, songs=page_songs, page_title=title) @app.get("/edit-song") def edit_song(): @@ -182,7 +80,7 @@ def edit_song(): song = None - colors = get_user_colors(session["userid"]) + colors = users.get_user_colors(session["userid"]) if "songid" in request.args: try: @@ -193,7 +91,7 @@ def edit_song(): abort(404) try: - song = Song.by_id(songid) + song = songs.Song.by_id(songid) if not song.userid == session["userid"]: # Can't edit someone else's song - 401 unauthorized app.logger.warning(f"Failed song edit - {session['username']} - attempted update for unowned song") @@ -452,7 +350,7 @@ def delete_song(songid): def song(userid, songid): if request.args.get("action", None) == "view": try: - song = Song.by_id(songid) + song = songs.Song.by_id(songid) if song.userid != userid: abort(404) @@ -460,150 +358,31 @@ def song(userid, songid): "song.html", songs=[song], song=song, - **get_user_colors(userid)) + **users.get_user_colors(userid)) except ValueError: abort(404) else: return send_from_directory(datadir.get_user_songs_path(userid), str(songid) + ".mp3") @app.get("/songs") -def songs(): +def view_songs(): tag = request.args.get("tag", None) user = request.args.get("user", None) - colors = DEFAULT_COLORS + page_colors = colors.DEFAULT_COLORS if user: - colors = get_user_colors(user) + page_colors = users.get_user_colors(user) if tag and user: - songs = Song.get_all_for_username_and_tag(user, tag) + page_songs = songs.Song.get_all_for_username_and_tag(user, tag) elif tag: - songs = Song.get_all_for_tag(tag) + page_songs = songs.Song.get_all_for_tag(tag) elif user: - songs = Song.get_all_for_username(user) + page_songs = songs.Song.get_all_for_username(user) else: - songs = Song.get_random(50) - - return render_template("songs-by-tag.html", user=user, tag=tag, songs=songs, **colors) + page_songs = songs.Song.get_random(50) -@app.route("/comment", methods=["GET", "POST"]) -def comment(): - if not "userid" in session: - return redirect("/login") - - if not "threadid" in request.args: - abort(400) # Must have threadid - - thread = db.query("select * from comment_threads where threadid = ?", [request.args["threadid"]], one=True) - if not thread: - abort(404) # Invalid threadid - - # Check for comment being replied to - replyto = None - if "replytoid" in request.args: - replytoid = request.args["replytoid"] - replyto = db.query("select * from comments inner join users on comments.userid == users.userid where commentid = ?", [replytoid], one=True) - if not replyto: - abort(404) # Invalid comment - - # Check for comment being edited - comment = None - if "commentid" in request.args: - commentid = request.args["commentid"] - comment = db.query("select * from comments inner join users on comments.userid == users.userid where commentid = ?", [commentid], one=True) - if not comment: - abort(404) # Invalid comment - if comment["userid"] != session["userid"]: - abort(403) # User doesn't own this comment - - if request.method == "GET": - # Show the comment editor - session["previous_page"] = request.referrer - threadtype = thread["threadtype"] - song = None - profile = None - playlist = None - if threadtype == comments.ThreadType.SONG: - song = Song.by_threadid(request.args["threadid"]) - elif threadtype == comments.ThreadType.PROFILE: - profile = db.query("select * from users where threadid = ?", [request.args["threadid"]], one=True) - elif threadtype == comments.ThreadType.PLAYLIST: - profile = db.query("select * from playlists inner join users on playlists.userid = users.userid where playlists.threadid = ?", [request.args["threadid"]], one=True) - return render_template( - "comment.html", - song=song, - profile=profile, - playlist=playlist, - replyto=replyto, - comment=comment, - ) - - elif request.method == "POST": - # Add/update comment (user clicked the Post Comment button) - content = request.form["content"] - if comment: - # Update existing comment - db.query("update comments set content = ? where commentid = ?", args=[content, comment["commentid"]]) - else: - # Add new comment - timestamp = datetime.now(timezone.utc).isoformat() - userid = session["userid"] - replytoid = request.args.get("replytoid", None) - - threadid = request.args["threadid"] - comment = db.query( - "insert into comments (threadid, userid, replytoid, created, content) values (?, ?, ?, ?, ?) returning (commentid)", - args=[threadid, userid, replytoid, timestamp, content], one=True) - commentid = comment["commentid"] - - # Notify content owner - notification_targets = {thread["userid"]} - if replyto: - # Notify parent commenter - notification_targets.add(replyto["userid"]) - - # Notify previous repliers in thread - previous_replies = db.query("select * from comments where replytoid = ?", [replytoid]) - for reply in previous_replies: - notification_targets.add(reply["userid"]) - - # Don't notify the person who wrote the comment - if userid in notification_targets: - notification_targets.remove(userid) - - # Create notifications - for target in notification_targets: - db.query("insert into notifications (objectid, objecttype, targetuserid, created) values (?, ?, ?, ?)", [commentid, ObjectType.COMMENT, target, timestamp]) - - db.commit() - - return redirect_to_previous_page() - -def redirect_to_previous_page(): - previous_page = "/" - if "previous_page" in session: - previous_page = session["previous_page"] - session.pop("previous_page") - return redirect(previous_page) - -@app.get("/delete-comment/") -def comment_delete(commentid): - if "userid" not in session: - return redirect("/login") - - comment = db.query("select c.userid as comment_user, t.userid as thread_user from comments as c inner join comment_threads as t on c.threadid == t.threadid where commentid = ?", [commentid], one=True) - if not comment: - abort(404) # Invalid comment - - # Only commenter and song owner can delete comments - if not ((comment["comment_user"] == session["userid"]) - or (comment["thread_user"] == session["userid"])): - abort(403) - - db.query("delete from comments where (commentid = ?) or (replytoid = ?)", [commentid, commentid]) - db.commit() - - return redirect(request.referrer) + return render_template("songs-by-tag.html", user=user, tag=tag, songs=page_songs, **page_colors) @app.get("/activity") def activity(): @@ -622,13 +401,13 @@ def activity(): where (n.targetuserid = ?) and (n.objecttype = ?) order by c.created desc """, - [session["userid"], ObjectType.COMMENT]) + [session["userid"], comments.ObjectType.COMMENT]) notifications = [dict(c) for c in notifications] for comment in notifications: threadtype = comment["threadtype"] if threadtype == comments.ThreadType.SONG: - song = Song.by_threadid(comment["threadid"]) + song = songs.Song.by_threadid(comment["threadid"]) comment["songid"] = song.songid comment["title"] = song.title comment["content_userid"] = song.userid @@ -844,10 +623,10 @@ def playlists(playlistid): abort(404) # Cannot view other user's private playlist - pretend it doesn't even exist # Get songs - songs = Song.get_for_playlist(playlistid) + plist_songs = songs.Song.get_for_playlist(playlistid) # Get comments - plist_comments = get_comments(plist_data["threadid"]) + plist_comments = comments.for_thread(plist_data["threadid"]) # Show page return render_template( @@ -858,47 +637,10 @@ def playlists(playlistid): userid=plist_data["userid"], username=plist_data["username"], threadid=plist_data["threadid"], - **get_user_colors(plist_data), - songs=songs, + **users.get_user_colors(plist_data), + songs=plist_songs, comments=plist_comments) -def sanitize_user_text(text): - allowed_tags = bleach.sanitizer.ALLOWED_TAGS.union({ - 'area', 'br', 'div', 'img', 'map', 'hr', 'header', 'hgroup', 'table', 'tr', 'td', - 'th', 'thead', 'tbody', 'span', 'small', 'p', 'q', 'u', 'pre', - }) - allowed_attributes = { - "*": ["style"], "a": ["href", "title"], "abbr": ["title"], "acronym": ["title"], - "img": ["src", "alt", "usemap", "width", "height"], "map": ["name"], - "area": ["shape", "coords", "alt", "href"] - } - allowed_css_properties = { - "font-size", "font-style", "font-variant", "font-family", "font-weight", "color", - "background-color", "background-image", "border", "border-color", - "border-image", "width", "height" - } - css_sanitizer = CSSSanitizer(allowed_css_properties=allowed_css_properties) - return bleach.clean( - text, - tags=allowed_tags, - attributes=allowed_attributes, - css_sanitizer=css_sanitizer) - -def get_comments(threadid): - thread_comments = db.query("select * from comments inner join users on comments.userid == users.userid where comments.threadid = ?", [threadid]) - thread_comments = [dict(c) for c in thread_comments] - for c in thread_comments: - c["content"] = sanitize_user_text(c["content"]) - - # Top-level comments - song_comments = sorted([dict(c) for c in thread_comments if c["replytoid"] is None], key=lambda c: c["created"]) - song_comments = list(reversed(song_comments)) - # Replies (can only reply to top-level) - for comment in song_comments: - comment["replies"] = sorted([c for c in thread_comments if c["replytoid"] == comment["commentid"]], key=lambda c: c["created"]) - - return song_comments - def get_gif_data(): gifs = [] static_path = Path(__file__).parent / "static" @@ -918,30 +660,12 @@ def get_current_user_playlists(): return plist_data -def get_user_colors(user_data): - if isinstance(user_data, int): - # Get colors for userid - user_data = db.query("select * from users where userid = ?", [user_data], one=True) - elif isinstance(user_data, str): - # Get colors for username - user_data = db.query("select * from users where username = ?", [user_data], one=True) - - colors = dict(bgcolor=BGCOLOR, fgcolor=FGCOLOR, accolor=ACCOLOR) - for key in colors: - if user_data and user_data[key]: - colors[key] = user_data[key] - - return colors - -def user_has_pfp(userid): - return (datadir.get_user_images_path(userid)/"pfp.jpg").exists() - @app.context_processor def inject_global_vars(): return dict( gif_data=get_gif_data(), current_user_playlists=get_current_user_playlists(), - **DEFAULT_COLORS, + **colors.DEFAULT_COLORS, ) @@ -956,111 +680,3 @@ def gen_key(): import secrets print(secrets.token_hex()) -class ObjectType(enum.IntEnum): - COMMENT = 0 - -@dataclass -class Song: - songid: int - userid: int - threadid: int - username: str - title: str - description: str - created: str - tags: list[str] - collaborators: list[str] - user_has_pfp: bool - - def json(self): - return json.dumps(vars(self)) - - def get_comments(self): - return get_comments(self.threadid) - - @classmethod - def by_id(cls, songid): - songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songid = ?", [songid]) - if not songs: - raise ValueError(f"No song for ID {songid:d}") - - return songs[0] - - @classmethod - def by_threadid(cls, threadid): - songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songs.threadid = ?", [threadid]) - if not songs: - raise ValueError(f"No song for Thread ID {songid:d}") - - return songs[0] - - @classmethod - def get_all_for_userid(cls, userid): - return cls._from_db("select * from songs inner join users on songs.userid = users.userid where songs.userid = ? order by songs.created desc", [userid]) - - @classmethod - def get_all_for_username(cls, username): - return cls._from_db("select * from songs inner join users on songs.userid = users.userid where users.username = ? order by songs.created desc", [username]) - - @classmethod - def get_all_for_username_and_tag(cls, username, tag): - return cls._from_db(f"select * from song_tags inner join songs on song_tags.songid = songs.songid inner join users on songs.userid = users.userid where (username = ? and tag = ?) order by songs.created desc", [username, tag]) - - @classmethod - def get_all_for_tag(cls, tag): - return cls._from_db(f"select * from song_tags inner join songs on song_tags.songid = songs.songid inner join users on songs.userid = users.userid where (tag = ?) order by songs.created desc", [tag]) - - @classmethod - def get_latest(cls, count): - return cls._from_db("select * from songs inner join users on songs.userid = users.userid order by songs.created desc limit ?", [count]) - - @classmethod - def get_random(cls, count): - # Get random songs + 10 extras so I can filter out my own (I uploaded too many :/) - songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songid in (select songid from songs order by random() limit ?)", [count + 10]) - random.shuffle(songs) - - # Prevent my songs from showing up in the first 10 results - for i in reversed(range(min(10, len(songs)))): - if songs[i].username == "cfulljames": - del songs[i] - - # Drop any extra songs (since we asked for 10 extras) - songs = songs[:count] - - return songs - - @classmethod - def get_for_playlist(cls, playlistid): - return cls._from_db("""\ - select * from playlist_songs - inner join songs on playlist_songs.songid = songs.songid - inner join users on songs.userid = users.userid - where playlistid = ? - order by playlist_songs.position asc - """, [playlistid]) - - @classmethod - def _from_db(cls, query, args=()): - songs_data = db.query(query, args) - tags, collabs = cls._get_info_for_songs(songs_data) - songs = [] - for sd in songs_data: - song_tags = [t["tag"] for t in tags[sd["songid"]] if t["tag"]] - song_collabs = [c["name"] for c in collabs[sd["songid"]] if c["name"]] - created = datetime.fromisoformat(sd["created"]).astimezone().strftime("%Y-%m-%d") - has_pfp = user_has_pfp(sd["userid"]) - songs.append(cls(sd["songid"], sd["userid"], sd["threadid"], sd["username"], sd["title"], sanitize_user_text(sd["description"]), created, song_tags, song_collabs, has_pfp)) - return songs - - @classmethod - def _get_info_for_songs(cls, songs): - tags = {} - collabs = {} - for song in songs: - songid = song["songid"] - tags[songid] = db.query("select (tag) from song_tags where songid = ?", [songid]) - collabs[songid] = db.query("select (name) from song_collaborators where songid = ?", [songid]) - return tags, collabs - - diff --git a/src/littlesongplace/colors.py b/src/littlesongplace/colors.py new file mode 100644 index 0000000..b10a857 --- /dev/null +++ b/src/littlesongplace/colors.py @@ -0,0 +1,5 @@ +BGCOLOR = "#e8e6b5" +FGCOLOR = "#695c73" +ACCOLOR = "#9373a9" +DEFAULT_COLORS = dict(bgcolor=BGCOLOR, fgcolor=FGCOLOR, accolor=ACCOLOR) + diff --git a/src/littlesongplace/comments.py b/src/littlesongplace/comments.py index e516438..1ddeaf8 100644 --- a/src/littlesongplace/comments.py +++ b/src/littlesongplace/comments.py @@ -1,14 +1,156 @@ import enum +from datetime import datetime, timezone -from . import db +from flask import abort, Blueprint, redirect, render_template, request, session -def create_thread(threadtype, userid): - thread = db.query("insert into comment_threads (threadtype, userid) values (?, ?) returning threadid", [threadtype, userid], one=True) - db.commit() - return thread["threadid"] +from . import db, songs +from .sanitize import sanitize_user_text + +bp = Blueprint("comments", __name__) class ThreadType(enum.IntEnum): SONG = 0 PROFILE = 1 PLAYLIST = 2 +class ObjectType(enum.IntEnum): + COMMENT = 0 + +def create_thread(threadtype, userid): + thread = db.query("insert into comment_threads (threadtype, userid) values (?, ?) returning threadid", [threadtype, userid], one=True) + db.commit() + return thread["threadid"] + +def for_thread(threadid): + thread_comments = db.query("select * from comments inner join users on comments.userid == users.userid where comments.threadid = ?", [threadid]) + thread_comments = [dict(c) for c in thread_comments] + for c in thread_comments: + c["content"] = sanitize_user_text(c["content"]) + + # Top-level comments + song_comments = sorted([dict(c) for c in thread_comments if c["replytoid"] is None], key=lambda c: c["created"]) + song_comments = list(reversed(song_comments)) + # Replies (can only reply to top-level) + for comment in song_comments: + comment["replies"] = sorted([c for c in thread_comments if c["replytoid"] == comment["commentid"]], key=lambda c: c["created"]) + + return song_comments + +@bp.route("/comment", methods=["GET", "POST"]) +def comment(): + if not "userid" in session: + return redirect("/login") + + if not "threadid" in request.args: + abort(400) # Must have threadid + + thread = db.query("select * from comment_threads where threadid = ?", [request.args["threadid"]], one=True) + if not thread: + abort(404) # Invalid threadid + + # Check for comment being replied to + replyto = None + if "replytoid" in request.args: + replytoid = request.args["replytoid"] + replyto = db.query("select * from comments inner join users on comments.userid == users.userid where commentid = ?", [replytoid], one=True) + if not replyto: + abort(404) # Invalid comment + + # Check for comment being edited + comment = None + if "commentid" in request.args: + commentid = request.args["commentid"] + comment = db.query("select * from comments inner join users on comments.userid == users.userid where commentid = ?", [commentid], one=True) + if not comment: + abort(404) # Invalid comment + if comment["userid"] != session["userid"]: + abort(403) # User doesn't own this comment + + if request.method == "GET": + # Show the comment editor + session["previous_page"] = request.referrer + threadtype = thread["threadtype"] + song = None + profile = None + playlist = None + if threadtype == ThreadType.SONG: + song = songs.Song.by_threadid(request.args["threadid"]) + elif threadtype == ThreadType.PROFILE: + profile = db.query("select * from users where threadid = ?", [request.args["threadid"]], one=True) + elif threadtype == ThreadType.PLAYLIST: + profile = db.query("select * from playlists inner join users on playlists.userid = users.userid where playlists.threadid = ?", [request.args["threadid"]], one=True) + return render_template( + "comment.html", + song=song, + profile=profile, + playlist=playlist, + replyto=replyto, + comment=comment, + ) + + elif request.method == "POST": + # Add/update comment (user clicked the Post Comment button) + content = request.form["content"] + if comment: + # Update existing comment + db.query("update comments set content = ? where commentid = ?", args=[content, comment["commentid"]]) + else: + # Add new comment + timestamp = datetime.now(timezone.utc).isoformat() + userid = session["userid"] + replytoid = request.args.get("replytoid", None) + + threadid = request.args["threadid"] + comment = db.query( + "insert into comments (threadid, userid, replytoid, created, content) values (?, ?, ?, ?, ?) returning (commentid)", + args=[threadid, userid, replytoid, timestamp, content], one=True) + commentid = comment["commentid"] + + # Notify content owner + notification_targets = {thread["userid"]} + if replyto: + # Notify parent commenter + notification_targets.add(replyto["userid"]) + + # Notify previous repliers in thread + previous_replies = db.query("select * from comments where replytoid = ?", [replytoid]) + for reply in previous_replies: + notification_targets.add(reply["userid"]) + + # Don't notify the person who wrote the comment + if userid in notification_targets: + notification_targets.remove(userid) + + # Create notifications + for target in notification_targets: + db.query("insert into notifications (objectid, objecttype, targetuserid, created) values (?, ?, ?, ?)", [commentid, ObjectType.COMMENT, target, timestamp]) + + db.commit() + + return redirect_to_previous_page() + +def redirect_to_previous_page(): + previous_page = "/" + if "previous_page" in session: + previous_page = session["previous_page"] + session.pop("previous_page") + return redirect(previous_page) + +@bp.get("/delete-comment/") +def comment_delete(commentid): + if "userid" not in session: + return redirect("/login") + + comment = db.query("select c.userid as comment_user, t.userid as thread_user from comments as c inner join comment_threads as t on c.threadid == t.threadid where commentid = ?", [commentid], one=True) + if not comment: + abort(404) # Invalid comment + + # Only commenter and song owner can delete comments + if not ((comment["comment_user"] == session["userid"]) + or (comment["thread_user"] == session["userid"])): + abort(403) + + db.query("delete from comments where (commentid = ?) or (replytoid = ?)", [commentid, commentid]) + db.commit() + + return redirect(request.referrer) diff --git a/src/littlesongplace/profiles.py b/src/littlesongplace/profiles.py new file mode 100644 index 0000000..fb8a4ca --- /dev/null +++ b/src/littlesongplace/profiles.py @@ -0,0 +1,100 @@ +from flask import abort, Blueprint, current_app, flash, send_from_directory, redirect, render_template, request, session +from PIL import Image, UnidentifiedImageError + +from . import comments, datadir, db, songs, users +from .sanitize import sanitize_user_text + +bp = Blueprint("profiles", __name__) + +@bp.get("/users/") +def users_profile(profile_username): + + # Look up user data for current profile + profile_data = db.query("select * from users where username = ?", [profile_username], one=True) + if profile_data is None: + abort(404) + profile_userid = profile_data["userid"] + + # Get playlists for current profile + userid = session.get("userid", None) + show_private = userid == profile_userid + if show_private: + plist_data = db.query("select * from playlists where userid = ? order by updated desc", [profile_userid]) + else: + plist_data = db.query("select * from playlists where userid = ? and private = 0 order by updated desc", [profile_userid]) + + # Get songs for current profile + profile_songs = songs.Song.get_all_for_userid(profile_userid) + + # Get comments for current profile + profile_comments = comments.for_thread(profile_data["threadid"]) + + # Sanitize bio + profile_bio = "" + if profile_data["bio"] is not None: + profile_bio = sanitize_user_text(profile_data["bio"]) + + return render_template( + "profile.html", + name=profile_username, + userid=profile_userid, + bio=profile_bio, + **users.get_user_colors(profile_data), + playlists=plist_data, + songs=profile_songs, + comments=profile_comments, + threadid=profile_data["threadid"], + user_has_pfp=users.user_has_pfp(profile_userid)) + +@bp.post("/edit-profile") +def edit_profile(): + if not "userid" in session: + abort(401) + + db.query( + "update users set bio = ?, bgcolor = ?, fgcolor = ?, accolor = ? where userid = ?", + [request.form["bio"], request.form["bgcolor"], request.form["fgcolor"], request.form["accolor"], session["userid"]]) + db.commit() + + if request.files["pfp"]: + pfp_path = datadir.get_user_images_path(session["userid"]) / "pfp.jpg" + + try: + with Image.open(request.files["pfp"]) as im: + # Drop alpha channel + if im.mode in ("RGBA", "P"): + im = im.convert("RGB") + + target_size = 256 # Square (same width/height) + # Resize + if im.width >= im.height: + scale = 256 / im.height + else: + scale = 256 / im.width + + im = im.resize((round(im.width*scale), round(im.height*scale))) + + # Crop to square + center_h = im.width / 2 + center_v = im.height / 2 + left = center_h - (target_size // 2) + right = center_h + (target_size // 2) + top = center_v - (target_size // 2) + bottom = center_v + (target_size // 2) + im = im.crop((left, top, right, bottom)) + + # Save to permanent location + im.save(pfp_path) + except UnidentifiedImageError: + abort(400) # Invalid image + + flash("Profile updated successfully") + + current_app.logger.info(f"{session['username']} updated bio") + + return redirect(f"/users/{session['username']}") + +@bp.get("/pfp/") +def pfp(userid): + return send_from_directory(datadir.get_user_images_path(userid), "pfp.jpg") + diff --git a/src/littlesongplace/sanitize.py b/src/littlesongplace/sanitize.py new file mode 100644 index 0000000..464585a --- /dev/null +++ b/src/littlesongplace/sanitize.py @@ -0,0 +1,25 @@ +import bleach +from bleach.css_sanitizer import CSSSanitizer + +def sanitize_user_text(text): + allowed_tags = bleach.sanitizer.ALLOWED_TAGS.union({ + 'area', 'br', 'div', 'img', 'map', 'hr', 'header', 'hgroup', 'table', 'tr', 'td', + 'th', 'thead', 'tbody', 'span', 'small', 'p', 'q', 'u', 'pre', + }) + allowed_attributes = { + "*": ["style"], "a": ["href", "title"], "abbr": ["title"], "acronym": ["title"], + "img": ["src", "alt", "usemap", "width", "height"], "map": ["name"], + "area": ["shape", "coords", "alt", "href"] + } + allowed_css_properties = { + "font-size", "font-style", "font-variant", "font-family", "font-weight", "color", + "background-color", "background-image", "border", "border-color", + "border-image", "width", "height" + } + css_sanitizer = CSSSanitizer(allowed_css_properties=allowed_css_properties) + return bleach.clean( + text, + tags=allowed_tags, + attributes=allowed_attributes, + css_sanitizer=css_sanitizer) + diff --git a/src/littlesongplace/songs.py b/src/littlesongplace/songs.py new file mode 100644 index 0000000..ec7b14e --- /dev/null +++ b/src/littlesongplace/songs.py @@ -0,0 +1,111 @@ +import json +from datetime import datetime +from dataclasses import dataclass + +from . import comments, db, users +from .sanitize import sanitize_user_text + +@dataclass +class Song: + songid: int + userid: int + threadid: int + username: str + title: str + description: str + created: str + tags: list[str] + collaborators: list[str] + user_has_pfp: bool + + def json(self): + return json.dumps(vars(self)) + + def get_comments(self): + return comments.for_thread(self.threadid) + + @classmethod + def by_id(cls, songid): + songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songid = ?", [songid]) + if not songs: + raise ValueError(f"No song for ID {songid:d}") + + return songs[0] + + @classmethod + def by_threadid(cls, threadid): + songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songs.threadid = ?", [threadid]) + if not songs: + raise ValueError(f"No song for Thread ID {songid:d}") + + return songs[0] + + @classmethod + def get_all_for_userid(cls, userid): + return cls._from_db("select * from songs inner join users on songs.userid = users.userid where songs.userid = ? order by songs.created desc", [userid]) + + @classmethod + def get_all_for_username(cls, username): + return cls._from_db("select * from songs inner join users on songs.userid = users.userid where users.username = ? order by songs.created desc", [username]) + + @classmethod + def get_all_for_username_and_tag(cls, username, tag): + return cls._from_db(f"select * from song_tags inner join songs on song_tags.songid = songs.songid inner join users on songs.userid = users.userid where (username = ? and tag = ?) order by songs.created desc", [username, tag]) + + @classmethod + def get_all_for_tag(cls, tag): + return cls._from_db(f"select * from song_tags inner join songs on song_tags.songid = songs.songid inner join users on songs.userid = users.userid where (tag = ?) order by songs.created desc", [tag]) + + @classmethod + def get_latest(cls, count): + return cls._from_db("select * from songs inner join users on songs.userid = users.userid order by songs.created desc limit ?", [count]) + + @classmethod + def get_random(cls, count): + # Get random songs + 10 extras so I can filter out my own (I uploaded too many :/) + songs = cls._from_db("select * from songs inner join users on songs.userid = users.userid where songid in (select songid from songs order by random() limit ?)", [count + 10]) + random.shuffle(songs) + + # Prevent my songs from showing up in the first 10 results + for i in reversed(range(min(10, len(songs)))): + if songs[i].username == "cfulljames": + del songs[i] + + # Drop any extra songs (since we asked for 10 extras) + songs = songs[:count] + + return songs + + @classmethod + def get_for_playlist(cls, playlistid): + return cls._from_db("""\ + select * from playlist_songs + inner join songs on playlist_songs.songid = songs.songid + inner join users on songs.userid = users.userid + where playlistid = ? + order by playlist_songs.position asc + """, [playlistid]) + + @classmethod + def _from_db(cls, query, args=()): + songs_data = db.query(query, args) + tags, collabs = cls._get_info_for_songs(songs_data) + songs = [] + for sd in songs_data: + song_tags = [t["tag"] for t in tags[sd["songid"]] if t["tag"]] + song_collabs = [c["name"] for c in collabs[sd["songid"]] if c["name"]] + created = datetime.fromisoformat(sd["created"]).astimezone().strftime("%Y-%m-%d") + has_pfp = users.user_has_pfp(sd["userid"]) + songs.append(cls(sd["songid"], sd["userid"], sd["threadid"], sd["username"], sd["title"], sanitize_user_text(sd["description"]), created, song_tags, song_collabs, has_pfp)) + return songs + + @classmethod + def _get_info_for_songs(cls, songs): + tags = {} + collabs = {} + for song in songs: + songid = song["songid"] + tags[songid] = db.query("select (tag) from song_tags where songid = ?", [songid]) + collabs[songid] = db.query("select (name) from song_collaborators where songid = ?", [songid]) + return tags, collabs + diff --git a/src/littlesongplace/users.py b/src/littlesongplace/users.py new file mode 100644 index 0000000..73cc3fd --- /dev/null +++ b/src/littlesongplace/users.py @@ -0,0 +1,20 @@ +from . import colors, datadir, db + +def user_has_pfp(userid): + return (datadir.get_user_images_path(userid)/"pfp.jpg").exists() + +def get_user_colors(user_data): + if isinstance(user_data, int): + # Get colors for userid + user_data = db.query("select * from users where userid = ?", [user_data], one=True) + elif isinstance(user_data, str): + # Get colors for username + user_data = db.query("select * from users where username = ?", [user_data], one=True) + + user_colors = colors.DEFAULT_COLORS.copy() + for key in user_colors: + if user_data and user_data[key]: + user_colors[key] = user_data[key] + + return user_colors +