import os
+import shutil
import sqlite3
+import subprocess
import sys
+import tempfile
import uuid
from pathlib import Path, PosixPath
import bcrypt
import click
from flask import Flask, render_template, request, redirect, g, session, abort, \
- send_from_directory
+ send_from_directory, flash
from werkzeug.utils import secure_filename
################################################################################
password = request.form["password"]
password_confirm = request.form["password_confirm"]
- error = None
+ error = False
if not username.isidentifier():
- error = "Username cannot contain special characters"
+ flash("Username cannot contain special characters", "error")
+ error = True
elif len(username) < 3:
- error ="Username must be at least 3 characters"
+ flash("Username must be at least 3 characters", "error")
+ error = True
elif password != password_confirm:
- error = "Passwords do not match"
+ flash("Passwords do not match", "error")
+ error = True
elif len(password) < 8:
- error = "Password must be at least 8 characters"
+ flash("Password must be at least 8 characters", "error")
+ error = True
if query_db("select * from users where username = ?", [username], one=True):
- error = f"Username '{username}' is already taken"
+ flash(f"Username '{username}' is already taken", "error")
+ error = True
if error:
- return render_template("signup.html", error=error)
+ return redirect(request.referrer)
password = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
query_db("insert into users (username, password) values (?, ?)", [username, password])
session.permanent = True
return redirect("/")
- return render_template("login.html", error="Invalid username/password")
+ flash("Invalid username/password", "error")
+ return render_template("login.html")
@app.get("/logout")
def logout():
# Get songs for current profile
profile_userid = profile_data["userid"]
profile_songs_data = query_db("select * from songs where userid = ?", [profile_userid])
-
- return render_template("profile.html", name=profile_username, username=username, songs=profile_songs_data)
+ profile_songs_tags = {}
+ profile_songs_collabs = {}
+ for song in profile_songs_data:
+ songid = song["songid"]
+ profile_songs_tags[songid] = query_db("select (tag) from song_tags where songid = ?", [songid])
+ profile_songs_collabs[songid] = query_db("select (name) from song_collaborators where songid = ?", [songid])
+
+ return render_template(
+ "profile.html",
+ name=profile_username,
+ username=username,
+ songs=profile_songs_data,
+ songs_tags=profile_songs_tags,
+ songs_collaborators=profile_songs_collabs)
@app.post("/uploadsong")
def upload_song():
title = request.form["title"]
description = request.form["description"]
- error = None
+ error = False
+
+ # Check if title is valid
+ if not title.isprintable():
+ flash(f"'{title}' is not a valid song title", "error")
+ error = True
+
+ # Check if description is valid
+ if not description.isprintable():
+ flash(f"Description contains invalid characters", "error")
+ error = True
# Check if tags are valid
tags = request.form["tags"]
tags = [t.strip() for t in tags.split(",")]
for tag in tags:
- if not tag.isidentifier():
- error = f"'{tag}' is not a valid tag name"
- break
+ if not tag.isprintable():
+ flash(f"'{tag}' is not a valid tag name", "error")
+ error = True
# Check if collaborators are valid
collaborators = request.form["collabs"]
collaborators = [c.strip() for c in collaborators.split(",")]
- collab_ids = {}
for collab in collaborators:
- # Check if @user exists
- if collab.startswith("@"):
- collab_user_data = query_db("select * from users where username = ?", [collab[1:]], one=True)
- if collab_user_data is None:
- error = f"Invalid collaborator username: {collab}"
- break
+ if not collab.isprintable():
+ flash(f"'{collab}' is not a valid collaborator name", "error")
+ error = True
+
+ # Validate and save mp3 file
+ if not error:
+ with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
+ file.save(tmp_file)
+ tmp_file.close()
+
+ result = subprocess.run(["mpck", tmp_file.name], stdout=subprocess.PIPE)
+ lines = result.stdout.decode().split("\r\n")
+ lines = [l.strip().lower() for l in lines]
+ passed = any(l.startswith("result") and l.endswith("ok") for l in lines)
+
+ if not passed:
+ flash("Invalid mp3 file", "error")
else:
- collab_ids[collab] = collab_user_data["userid"]
+ # Create song
+ song_data = query_db(
+ "insert into songs (userid, title, description) values (?, ?, ?) returning (songid)",
+ [userid, title, description], one=True)
+ songid = song_data["songid"]
+ filepath = userpath / (str(song_data["songid"]) + ".mp3")
- # Check if valid name
- elif not collab.isprintable():
- error = f"Invalid collaborator name: {collab}"
- break
+ # Move file to permanent location
+ shutil.move(tmp_file.name, filepath)
- # TODO: Handle errors above
- # TODO: Validate song file
+ # Assign tags
+ for tag in tags:
+ query_db("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
- # Create song
- song_data = query_db(
- "insert into songs (userid, title, description) values (?, ?, ?) returning (songid)",
- [userid, title, description], one=True)
- songid = song_data["songid"]
+ # Assign collaborators
+ for collab in collaborators:
+ query_db("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab])
- # Assign tags
- for tag in tags:
- query_db("insert into song_tags (tag, songid) values (?, ?)", [tag, songid])
-
- # List collaborators
- for collab in collaborators:
- if collab.startswith("@"):
- collab_id = collab_ids[collab]
- query_db("insert into song_collaborators (songid, userid) values (?, ?)", [songid, collab_id])
- else:
- query_db("insert into song_collaborators (songid, name) values (?, ?)", [songid, collab])
-
- get_db().commit()
+ get_db().commit()
- filepath = userpath / (str(song_data["songid"]) + ".mp3")
- file.save(filepath)
+ flash(f"Successfully uploaded '{title}'", "success")
- return redirect(f"/users/{username}")
+ return redirect(request.referrer)
@app.get("/song/<userid>/<songid>")
def song(userid, songid):