518 lines
15 KiB
Python
518 lines
15 KiB
Python
# builtins
|
|
import io
|
|
import random
|
|
import secrets
|
|
import datetime
|
|
|
|
# third-party
|
|
import PIL.Image, PIL.ImageDraw, PIL.ImageFont, PIL.ImageFilter # pillow, not built-in PIL
|
|
import markupsafe
|
|
import werkzeug
|
|
import flask
|
|
import peewee
|
|
|
|
# internals
|
|
from application import app
|
|
|
|
import util
|
|
import rendering
|
|
import form
|
|
import markdown
|
|
import admin
|
|
import tagging
|
|
|
|
class CommentCollection(admin.Administerable):
|
|
|
|
@werkzeug.utils.cached_property
|
|
def parent(self):
|
|
|
|
# find (and return) the Commentable instance using this CommentCollection, if any
|
|
# TODO: It should be possible to do this with one big JOINed statement
|
|
# NOTE: This is essentially a copy of TagCollection.parent
|
|
for cls in Commentable.__class_descendants__.values():
|
|
|
|
if cls not in app.models_abstract:
|
|
|
|
try:
|
|
return cls.select().where(cls.comment_collection == self).get()
|
|
except cls.DoesNotExist:
|
|
pass # continue to next loop iteration
|
|
|
|
return None
|
|
|
|
comment_markdown_help = """
|
|
Allows limited markdown, but no HTML.
|
|
|
|
**Emphasis:**
|
|
|
|
* `*italic*` → *italic*
|
|
* `**bold**` → **bold**
|
|
* `***bold italic***` → ***bold italic***
|
|
* `~~strikethrough~~` → ~~strikethrough~~
|
|
* `` `simple code` `` → `simple code`
|
|
|
|
**Blockquotes:**
|
|
|
|
*Input:*
|
|
|
|
```md
|
|
> This is a blockquote.
|
|
> It can span multiple lines and will appear indented.
|
|
```
|
|
|
|
*Output:*
|
|
|
|
> This is a blockquote.
|
|
> It can span multiple lines and will appear indented.
|
|
|
|
**Fenced code blocks:**
|
|
|
|
*Input:*
|
|
|
|
````md
|
|
```
|
|
def example(x):
|
|
if x >= 2:
|
|
do_stuff(x)
|
|
else:
|
|
do_thing(x)
|
|
```
|
|
````
|
|
|
|
*Output:*
|
|
|
|
```
|
|
def example(x):
|
|
if x >= 2:
|
|
do_stuff(x)
|
|
else:
|
|
do_thing(x)
|
|
```
|
|
"""
|
|
|
|
class Comment(admin.Administerable):
|
|
|
|
autoform_blacklist = ('id',)
|
|
|
|
collection = peewee.ForeignKeyField(CommentCollection, backref='items', null=False, on_delete='CASCADE')
|
|
reply_to = peewee.ForeignKeyField('self', null=True, on_delete='SET NULL', verbose_name='Reply to')
|
|
created = peewee.DateTimeField(default=datetime.datetime.utcnow, null=False)
|
|
captcha = peewee.CharField(null=False, default=util.random_string_light, verbose_name="Captcha", help_text="Case-sensitive")
|
|
captcha_passed = peewee.BooleanField(null=False, default=False, verbose_name="Passed captcha", help_text="Whether the comment passed the captcha challenge.")
|
|
moderation_passed = peewee.BooleanField(null=False, default=False, verbose_name="Passed moderation", help_text="Whether the comment passed moderation.")
|
|
nick = peewee.CharField(null=False, verbose_name='Your name')
|
|
text = markdown.SafeMarkdownTextField(verbose_name="Your message", help_text=comment_markdown_help)
|
|
|
|
@werkzeug.utils.cached_property
|
|
def replies(self):
|
|
|
|
# return replies that should be visible to visitors
|
|
|
|
return Comment.select().where(
|
|
(Comment.reply_to == self) &
|
|
(Comment.captcha_passed == True) &
|
|
(Comment.moderation_passed == True)
|
|
)
|
|
|
|
@werkzeug.utils.cached_property
|
|
def replies_all(self):
|
|
|
|
# ditto, but include replies that didn't pass captcha/moderation yet
|
|
|
|
return Comment.select().where(Comment.reply_to == self)
|
|
|
|
@werkzeug.utils.cached_property
|
|
def thread(self):
|
|
|
|
thread = {}
|
|
|
|
for reply in self.replies:
|
|
thread[reply] = reply.thread
|
|
|
|
return thread
|
|
|
|
@werkzeug.utils.cached_property
|
|
def thread_all(self):
|
|
|
|
thread = {}
|
|
|
|
for reply in self.replies_all:
|
|
thread[reply] = reply.thread_all
|
|
|
|
return thread
|
|
|
|
def reply_form(self):
|
|
|
|
commentable = self.collection.parent
|
|
comment = Comment(reply_to=self)
|
|
|
|
f = comment.form(
|
|
'create',
|
|
id=f'comment-reply-{self.id}',
|
|
action=flask.url_for('comment_post', type=commentable.__class__.__name__, name=commentable.name, reply_to_id=self.id)
|
|
)
|
|
|
|
f['collection_id'] = form.ValueField(value=self.collection_id)
|
|
f['reply_to_id'] = form.ValueField(value=self.id)
|
|
del f['created']
|
|
del f['captcha']
|
|
del f['captcha_passed']
|
|
del f['moderation_passed']
|
|
|
|
f['text'].id = f'comment-reply-{self.id}-text' # avoid id collision for help toggle
|
|
|
|
return f
|
|
|
|
@app.abstract
|
|
class Commentable(tagging.Taggable):
|
|
|
|
created = peewee.DateTimeField(default=datetime.datetime.utcnow, null=False)
|
|
published = peewee.BooleanField(null=False, default=False, verbose_name='Published')
|
|
comment_collection = peewee.ForeignKeyField(CommentCollection, null=True, on_delete='CASCADE')
|
|
comment_enabled = peewee.BooleanField(null=False, default=True, verbose_name="Activate commenting")
|
|
comment_moderated = peewee.BooleanField(null=False, default=False, verbose_name="Activate comment moderation")
|
|
|
|
autoform_blacklist = ('id', 'comment_collection_id')
|
|
|
|
@werkzeug.utils.cached_property
|
|
def comments(self):
|
|
|
|
# return all root-level comments that visitors should see.
|
|
# i.e. passed captcha and moderation (if any)
|
|
|
|
return Comment.select().where(
|
|
(Comment.collection == self.comment_collection) &
|
|
(Comment.reply_to == None) &
|
|
(Comment.captcha_passed == True) &
|
|
(Comment.moderation_passed == True)
|
|
)
|
|
|
|
@werkzeug.utils.cached_property
|
|
def comments_all(self):
|
|
|
|
# ditto, but includes comments that didn't pass captcha and/or moderation.
|
|
|
|
return Comment.select().where(
|
|
Comment.collection == self.comment_collection
|
|
)
|
|
|
|
@werkzeug.utils.cached_property
|
|
def comments_threaded(self):
|
|
|
|
# threaded representation of all comments that visitors should see.
|
|
|
|
comments_threaded = {}
|
|
|
|
for comment in self.comments:
|
|
comments_threaded[comment] = comment.thread
|
|
|
|
return comments_threaded
|
|
|
|
@werkzeug.utils.cached_property
|
|
def comments_threaded_all(self):
|
|
|
|
# ditto, but with comments that haven't passed captcha/moderation
|
|
|
|
comments_threaded = {}
|
|
|
|
for comment in self.comments_all:
|
|
comments_threaded[comment] = comment.thread_all
|
|
|
|
return comments_threaded
|
|
|
|
@classmethod
|
|
def list(cls):
|
|
return cls.select().where(cls.published == True).order_by(cls.created.desc())
|
|
|
|
def access(self, mode='full'):
|
|
|
|
if isinstance(flask.g.user, admin.User):
|
|
return True
|
|
|
|
# access allowed for anything but a list of "privileged" modes
|
|
return (mode not in ('create', 'edit', 'delete', 'admin-teaser')) and self.published
|
|
|
|
def render(self, mode='full', format='html'):
|
|
|
|
if not (self.published or flask.g.user):
|
|
return markupsafe.Markup('<div class="embed-error">Unpublished embedded content</div>')
|
|
|
|
return super().render(mode=mode, format=format)
|
|
|
|
def comment_form(self):
|
|
|
|
comment = Comment()
|
|
|
|
f = comment.form('create', action=flask.url_for('comment_post', type=self.__class__.__name__, name=self.name))
|
|
|
|
f['collection_id'] = form.ValueField(value=self.comment_collection_id)
|
|
f['reply_to_id'] = form.ValueField(value=None) # TODO: value
|
|
del f['created']
|
|
del f['captcha']
|
|
del f['captcha_passed']
|
|
del f['moderation_passed']
|
|
|
|
return f
|
|
|
|
@app.route('/comment/<string:type>/<string:name>/', methods=['GET', 'POST'])
|
|
@app.route('/comment/<string:type>/<string:name>/<int:reply_to_id>/', methods=['GET', 'POST'])
|
|
@rendering.page()
|
|
def comment_post(type, name, reply_to_id=None):
|
|
|
|
if not type in Commentable.__class_descendants__:
|
|
flask.abort(400)
|
|
|
|
cls = Commentable.__class_descendants__[type]
|
|
|
|
instance = cls.load(name)
|
|
|
|
# TODO: if not instance.published
|
|
|
|
if not instance.comment_enabled:
|
|
flask.abort(403)
|
|
|
|
collection = instance.comment_collection
|
|
|
|
if collection is None:
|
|
|
|
# if instance has no comment collection, create and assign it
|
|
collection = CommentCollection()
|
|
collection.save(force_insert=True)
|
|
|
|
instance.comment_collection = collection
|
|
instance.save()
|
|
|
|
if reply_to_id is not None:
|
|
comment = Comment.load(reply_to_id)
|
|
f = comment.reply_form()
|
|
else:
|
|
f = instance.comment_form()
|
|
|
|
if flask.request.method == 'POST':
|
|
|
|
f.handle()
|
|
|
|
if f.administerable.is_in_db: # successfully saved
|
|
return flask.redirect(flask.url_for('comment_captcha', type=type, name=name, id=f.administerable.id))
|
|
|
|
return f
|
|
|
|
@app.route('/comment/<string:type>/<string:name>/captcha/<int:id>/', methods=['GET', 'POST'])
|
|
@rendering.page()
|
|
def comment_captcha(type, name, id):
|
|
|
|
if not type in Commentable.__class_descendants__:
|
|
flask.abort(400)
|
|
|
|
cls = Commentable.__class_descendants__[type]
|
|
|
|
instance = cls.load(name)
|
|
|
|
if not instance.comment_enabled:
|
|
flask.abort(403)
|
|
|
|
comment = Comment.load(id)
|
|
|
|
if comment.captcha_passed:
|
|
flask.abort(400)
|
|
|
|
captcha_url = flask.url_for('serve_captcha', id=id)
|
|
|
|
f = form.Form()
|
|
f.intro = markupsafe.Markup(f'<img src="{captcha_url}" alt="Solve this captcha to continue" />')
|
|
f['captcha'] = form.Text(label='Captcha', help='Case-sensitive', required=True)
|
|
f.buttons['submit'] = form.Button(label='Continue')
|
|
|
|
if flask.request.method == 'POST':
|
|
|
|
f.handle() # binds and validates form
|
|
|
|
if f.valid:
|
|
|
|
if f['captcha'].value == comment.captcha:
|
|
|
|
comment.captcha_passed = True
|
|
|
|
if not instance.comment_moderated:
|
|
comment.moderation_passed = True
|
|
|
|
comment.save()
|
|
|
|
if instance.comment_moderated:
|
|
flask.flash("Your comment has been received and is awaiting moderation.", 'success')
|
|
else:
|
|
flask.flash("Your comment has been posted.", 'success')
|
|
return flask.redirect(instance.url('full'))
|
|
|
|
else:
|
|
flask.flash("You entered the captcha incorrectly. Make sure you enter the correct upper- and lowercase letters.", 'error')
|
|
|
|
return f
|
|
|
|
def font_getsize(font, text):
|
|
|
|
bbox = font.getbbox(text)
|
|
#return (bbox[2] - bbox[0], bbox[3] - bbox[1]) # old and naive, broken with most fonts
|
|
#return (bbox[2] - bbox[1], bbox[3] + 1) # https://stackoverflow.com/a/77999858 (+1 for subpixel stuff [i.e. clean AA]), works for most fonts
|
|
return (bbox[2], bbox[3] + 1) # most conservative solution, should work for every font
|
|
|
|
@app.route('/comment/captcha/<int:id>/')
|
|
def serve_captcha(id):
|
|
|
|
comment = Comment.load(id)
|
|
|
|
colors = [
|
|
(0,128,255),
|
|
(0,255,128),
|
|
(128,0,255),
|
|
(128,255,0),
|
|
(255,0,128),
|
|
(255,128,0)
|
|
]
|
|
|
|
font_path = 'themes/default/assets/fonts/Michroma/Michroma-Regular.ttf'
|
|
image = PIL.Image.new('RGBA', (320, 80), (255,255,255,0))
|
|
font = PIL.ImageFont.truetype(font_path, 42)
|
|
|
|
|
|
#x_jitter = ((image.width/10) * -1, 0)
|
|
#y_jitter = ((image.height/10) * -1, image.height/10)
|
|
x_jitter = (-5, 15)
|
|
y_jitter = (-15, 5)
|
|
|
|
textsize = font_getsize(font, ' '.join(comment.captcha))
|
|
centered = (image.width / 2 - textsize[0] / 2, image.height / 2 - textsize[1] / 2)
|
|
|
|
x = centered[0] + random.randint(x_jitter[0], x_jitter[1])
|
|
y = centered[1] + random.randint(y_jitter[0], y_jitter[1])
|
|
baseline = centered[1]
|
|
|
|
for char in comment.captcha:
|
|
|
|
c = colors[random.randint(0, len(colors) -1)]
|
|
c = tuple(list(c) + [random.randint(255,255)])
|
|
|
|
char_size = font_getsize(font, char)
|
|
|
|
char_wrapped = f' {char} '
|
|
char_wrapped_size = font_getsize(font, char_wrapped)
|
|
|
|
char_layer = PIL.Image.new('RGBA', char_wrapped_size, (255,255,255,0))
|
|
char_draw = PIL.ImageDraw.Draw(char_layer)
|
|
|
|
char_draw.text((0,0), char_wrapped, c, font=font)
|
|
char_layer = char_layer.rotate(random.randint(-15, 15), expand=True, resample=PIL.Image.BICUBIC)
|
|
|
|
image.paste(
|
|
char_layer,
|
|
(int(x), int(y)),
|
|
mask=char_layer
|
|
)
|
|
|
|
x += char_size[0] + random.randint(x_jitter[0], x_jitter[1])
|
|
y = baseline + random.randint(y_jitter[0], y_jitter[1])
|
|
|
|
shine = image.filter(PIL.ImageFilter.GaussianBlur(radius=8))
|
|
image = PIL.Image.alpha_composite(image, shine)
|
|
|
|
out = io.BytesIO()
|
|
image.save(out, format='PNG')
|
|
|
|
return flask.Response(
|
|
out.getvalue(),
|
|
mimetype='image/png'
|
|
)
|
|
|
|
class ModerationForm(form.Form):
|
|
|
|
def __init__(self, filter, **kwargs):
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
comments = Comment.select().order_by(Comment.collection)
|
|
|
|
if filter == 'awaiting_moderation':
|
|
comments = comments.where((Comment.captcha_passed == True) & (Comment.moderation_passed == False))
|
|
elif filter == 'awaiting_captcha':
|
|
comments = comments.where(Comment.captcha_passed == False)
|
|
elif filter == 'public':
|
|
comments = comments.where((Comment.captcha_passed == True) & (Comment.moderation_passed == True))
|
|
elif filter == 'all':
|
|
pass
|
|
else:
|
|
raise ValueError("Invalid filter")
|
|
|
|
self.filters = rendering.Menu(
|
|
'comment-moderation-actions',
|
|
items=(
|
|
{
|
|
'endpoint': 'admin.comment_moderation',
|
|
'params': {'filter': 'awaiting_moderation'},
|
|
'label': 'Awaiting moderation',
|
|
},
|
|
{
|
|
'endpoint': 'admin.comment_moderation',
|
|
'params': {'filter': 'awaiting_captcha'},
|
|
'label': 'Awaiting Captcha',
|
|
},
|
|
{
|
|
'endpoint': 'admin.comment_moderation',
|
|
'params': {'filter': 'public'},
|
|
'label': 'Public',
|
|
},
|
|
{
|
|
'endpoint': 'admin.comment_moderation',
|
|
'params': {'filter': 'all'},
|
|
'label': 'All',
|
|
},
|
|
)
|
|
)
|
|
|
|
self.comments = form.MultiGroup(form.types.INT, 'comments', required=True)
|
|
|
|
for comment in comments:
|
|
|
|
fs = form.Fieldset(name=f'comment-{comment.id}')
|
|
fs[f'comment-preview-{comment.id}'] = form.RenderableField(comment, mode='inline')
|
|
fs[f'comment-{comment.id}'] = form.Checkbox(group=self.comments, value=comment.id)
|
|
|
|
self[fs.name] = fs
|
|
|
|
self.buttons['approve'] = form.Button(label='Approve')
|
|
self.buttons['delete'] = form.Button(label='Delete')
|
|
|
|
def process(self, submit):
|
|
|
|
if submit == 'approve':
|
|
|
|
count = Comment.update(
|
|
moderation_passed=True,
|
|
captcha_passed=True
|
|
).where(Comment.id << self.comments.values).execute()
|
|
flask.flash(f"Approved {count} comments.", 'success')
|
|
|
|
elif submit == 'delete':
|
|
|
|
count = Comment.delete().where(
|
|
Comment.id << self.comments.values
|
|
).execute()
|
|
flask.flash(f"Deleted {count} comments.", 'success')
|
|
|
|
return flask.redirect('')
|
|
|
|
@app.admin.route('/comment/moderation/', methods=['GET', 'POST'])
|
|
@app.admin.route('/comment/moderation/<string:filter>/', methods=['GET', 'POST'])
|
|
@rendering.page()
|
|
def comment_moderation(filter='awaiting_moderation'):
|
|
|
|
f = ModerationForm(filter)
|
|
|
|
if flask.request.method == 'POST':
|
|
|
|
redirect = f.handle()
|
|
|
|
if redirect:
|
|
return redirect
|
|
|
|
return f
|