Merge pull request #55 from kaiyou/feat-refactor-permissions

Refactor the access control code
master
kaiyou 9 years ago committed by GitHub
commit cbc6bb5dd6

@ -0,0 +1,43 @@
from freeposte import app
import sys
import tabulate
# Known endpoints without permissions
known_missing_permissions = [
"index",
"static", "bootstrap.static",
"admin.static", "admin.login"
]
# Compute the permission table
missing_permissions = []
permissions = {}
for endpoint, function in app.view_functions.items():
audit = function.__dict__.get("_audit_permissions")
if audit:
handler, args = audit
if args:
model = args[0].__name__
key = args[1]
else:
model = key = None
permissions[endpoint] = [endpoint, handler.__name__, model, key]
elif endpoint not in known_missing_permissions:
missing_permissions.append(endpoint)
# Fail if any endpoint is missing a permission check
if missing_permissions:
print("The following endpoints are missing permission checks:")
print(missing_permissions.join(","))
sys.exit(1)
# Display the permissions table
print(tabulate.tabulate([
[route, *permissions[route.endpoint]]
for route in app.url_map.iter_rules() if route.endpoint in permissions
]))

@ -0,0 +1,114 @@
from freeposte.admin import db, models, forms
import flask
import flask_login
import functools
def permissions_wrapper(handler):
""" Decorator that produces a decorator for checking permissions.
"""
def callback(function, args, kwargs, dargs, dkwargs):
authorized = handler(args, kwargs, *dargs, **dkwargs)
if not authorized:
flask.abort(403)
elif type(authorized) is int:
flask.abort(authorized)
else:
return function(*args, **kwargs)
# If the handler has no argument, declare a
# simple decorator, otherwise declare a nested decorator
# There are at least two mandatory arguments
if handler.__code__.co_argcount > 2:
def decorator(*dargs, **dkwargs):
def inner(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
return callback(function, args, kwargs, dargs, dkwargs)
wrapper._audit_permissions = handler, dargs
return flask_login.login_required(wrapper)
return inner
else:
def decorator(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
return callback(function, args, kwargs, (), {})
wrapper._audit_permissions = handler, []
return flask_login.login_required(wrapper)
return decorator
@permissions_wrapper
def global_admin(args, kwargs):
""" The view is only available to global administrators.
"""
return flask_login.current_user.global_admin
@permissions_wrapper
def domain_admin(args, kwargs, model, key):
""" The view is only available to specific domain admins.
Global admins will still be able to access the resource.
A model and key must be provided. The model will be queries
based on the query parameter named after the key. The model may
either be Domain or an Email subclass (or any class with a
``domain`` attribute which stores a related Domain instance).
"""
obj = model.query.get(kwargs[key])
if not obj:
flask.abort(404)
else:
domain = obj if type(obj) is models.Domain else obj.domain
return domain in flask_login.current_user.get_managed_domains()
@permissions_wrapper
def owner(args, kwargs, model, key):
""" The view is only available to the resource owner or manager.
A model and key must be provided. The model will be queries
based on the query parameter named after the key. The model may
either be User or any model with a ``user`` attribute storing
a user instance (like Fetch).
If the query parameter is empty and the model is User, then
the resource being accessed is supposed to be the current
logged in user and access is obviously authorized.
"""
if kwargs[key] is None and model == models.User:
return True
obj = model.query.get(kwargs[key])
if not obj:
flask.abort(404)
else:
user = obj if type(obj) is models.User else obj.user
return (
user.email == flask_login.current_user.email
or user.domain in flask_login.current_user.get_managed_domains()
)
@permissions_wrapper
def authenticated(args, kwargs):
""" The view is only available to logged in users.
"""
return True
def confirmation_required(action):
""" View decorator that asks for a confirmation first.
"""
def inner(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
form = forms.ConfirmationForm()
if form.validate_on_submit():
return function(*args, **kwargs)
return flask.render_template(
"confirm.html", action=action.format(*args, **kwargs),
form=form
)
return wrapper
return inner

@ -22,7 +22,7 @@ Manager list
{% for manager in domain.managers %}
<tr>
<td>
<a href="{{ url_for('.manager_delete', manager=manager.email) }}" title="Delete"><i class="fa fa-trash"></i></a>
<a href="{{ url_for('.manager_delete', domain_name=domain.name, user_email=manager.email) }}" title="Delete"><i class="fa fa-trash"></i></a>
</td>
<td>{{ manager }}</td>
</tr>

@ -1,69 +0,0 @@
from freeposte.admin import models, forms
import flask
import flask_login
import functools
def confirmation_required(action):
""" View decorator that asks for a confirmation first.
"""
def inner(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
form = forms.ConfirmationForm()
if form.validate_on_submit():
return function(*args, **kwargs)
return flask.render_template(
"confirm.html", action=action.format(*args, **kwargs),
form=form
)
return wrapper
return inner
def get_domain_admin(domain_name):
domain = models.Domain.query.get(domain_name)
if not domain:
flask.abort(404)
if not domain in flask_login.current_user.get_managed_domains():
flask.abort(403)
return domain
def require_global_admin():
if not flask_login.current_user.global_admin:
flask.abort(403)
def get_user(user_email, admin=False):
if user_email is None:
user_email = flask_login.current_user.email
user = models.User.query.get(user_email)
if not user:
flask.abort(404)
if not user.domain in flask_login.current_user.get_managed_domains():
if admin:
flask.abort(403)
elif not user.email == flask_login.current_user.email:
flask.abort(403)
return user
def get_alias(alias):
alias = models.Alias.query.get(alias)
if not alias:
flask.abort(404)
if not alias.domain in flask_login.current_user.get_managed_domains():
return 403
return alias
def get_fetch(fetch_id):
fetch = models.Fetch.query.get(fetch_id)
if not fetch:
flask.abort(404)
if not fetch.user.domain in flask_login.current_user.get_managed_domains():
if not fetch.user.email == flask_login.current_user.email:
flask.abort(403)
return fetch

@ -1,24 +1,19 @@
from freeposte.admin import app, db, models, forms, utils
from freeposte.admin import app, db, models, forms, access
import os
import pprint
import flask
import flask_login
import json
@app.route('/admin/list', methods=['GET'])
@flask_login.login_required
@access.global_admin
def admin_list():
utils.require_global_admin()
admins = models.User.query.filter_by(global_admin=True)
return flask.render_template('admin/list.html', admins=admins)
@app.route('/admin/create', methods=['GET', 'POST'])
@flask_login.login_required
@access.global_admin
def admin_create():
utils.require_global_admin()
form = forms.AdminForm()
form.admin.choices = [
(user.email, user.email)
@ -38,10 +33,9 @@ def admin_create():
@app.route('/admin/delete/<admin>', methods=['GET', 'POST'])
@utils.confirmation_required("delete admin {admin}")
@flask_login.login_required
@access.global_admin
@access.confirmation_required("delete admin {admin}")
def admin_delete(admin):
utils.require_global_admin()
user = models.User.query.get(admin)
if user:
user.global_admin = False

@ -1,22 +1,20 @@
from freeposte.admin import app, db, models, forms, utils
from freeposte.admin import app, db, models, forms, access
import os
import flask
import flask_login
import wtforms_components
@app.route('/alias/list/<domain_name>', methods=['GET'])
@flask_login.login_required
@access.domain_admin(models.Domain, 'domain_name')
def alias_list(domain_name):
domain = utils.get_domain_admin(domain_name)
domain = models.Domain.query.get(domain_name) or flask.abort(404)
return flask.render_template('alias/list.html', domain=domain)
@app.route('/alias/create/<domain_name>', methods=['GET', 'POST'])
@flask_login.login_required
@access.domain_admin(models.Domain, 'domain_name')
def alias_create(domain_name):
domain = utils.get_domain_admin(domain_name)
domain = models.Domain.query.get(domain_name) or flask.abort(404)
if domain.max_aliases and len(domain.aliases) >= domain.max_aliases:
flask.flash('Too many aliases for domain %s' % domain, 'error')
return flask.redirect(
@ -38,9 +36,9 @@ def alias_create(domain_name):
@app.route('/alias/edit/<alias>', methods=['GET', 'POST'])
@flask_login.login_required
@access.domain_admin(models.Alias, 'alias')
def alias_edit(alias):
alias = utils.get_alias(alias)
alias = models.Alias.query.get(alias) or flask.abort(404)
form = forms.AliasForm(obj=alias)
wtforms_components.read_only(form.localpart)
if form.validate_on_submit():
@ -54,10 +52,10 @@ def alias_edit(alias):
@app.route('/alias/delete/<alias>', methods=['GET', 'POST'])
@utils.confirmation_required("delete {alias}")
@flask_login.login_required
@access.domain_admin(models.Alias, 'alias')
@access.confirmation_required("delete {alias}")
def alias_delete(alias):
alias = utils.get_alias(alias)
alias = models.Alias.query.get(alias) or flask.abort(404)
db.session.delete(alias)
db.session.commit()
flask.flash('Alias %s deleted' % alias)

@ -1,13 +1,12 @@
from freeposte import dockercli
from freeposte.admin import app, db, models, forms, utils
from freeposte.admin import app, db, models, forms, access
import os
import flask
import flask_login
@app.route('/', methods=["GET"])
@flask_login.login_required
@access.authenticated
def index():
return flask.redirect(flask.url_for('.user_settings'))
@ -26,16 +25,15 @@ def login():
@app.route('/logout', methods=['GET'])
@flask_login.login_required
@access.authenticated
def logout():
flask_login.logout_user()
return flask.redirect(flask.url_for('.index'))
@app.route('/services', methods=['GET'])
@flask_login.login_required
@access.global_admin
def services():
utils.require_global_admin()
containers = {}
for brief in dockercli.containers(all=True):
if brief['Image'].startswith('freeposte/'):

@ -1,22 +1,19 @@
from freeposte.admin import app, db, models, forms, utils
from freeposte.admin import app, db, models, forms, access
from freeposte import app as flask_app
import os
import flask
import flask_login
import wtforms_components
@app.route('/domain', methods=['GET'])
@flask_login.login_required
@access.authenticated
def domain_list():
return flask.render_template('domain/list.html')
@app.route('/domain/create', methods=['GET', 'POST'])
@flask_login.login_required
@access.global_admin
def domain_create():
utils.require_global_admin()
form = forms.DomainForm()
if form.validate_on_submit():
if models.Domain.query.get(form.name.data):
@ -32,10 +29,9 @@ def domain_create():
@app.route('/domain/edit/<domain_name>', methods=['GET', 'POST'])
@flask_login.login_required
@access.global_admin
def domain_edit(domain_name):
utils.require_global_admin()
domain = utils.get_domain_admin(domain_name)
domain = models.Domain.query.get(domain_name) or flask.abort(404)
form = forms.DomainForm(obj=domain)
wtforms_components.read_only(form.name)
if form.validate_on_submit():
@ -48,11 +44,10 @@ def domain_edit(domain_name):
@app.route('/domain/delete/<domain_name>', methods=['GET', 'POST'])
@utils.confirmation_required("delete {domain_name}")
@flask_login.login_required
@access.global_admin
@access.confirmation_required("delete {domain_name}")
def domain_delete(domain_name):
utils.require_global_admin()
domain = utils.get_domain_admin(domain_name)
domain = models.Domain.query.get(domain_name) or flask.abort(404)
db.session.delete(domain)
db.session.commit()
flask.flash('Domain %s deleted' % domain)
@ -60,18 +55,18 @@ def domain_delete(domain_name):
@app.route('/domain/details/<domain_name>', methods=['GET'])
@flask_login.login_required
@access.domain_admin(models.Domain, 'domain_name')
def domain_details(domain_name):
domain = utils.get_domain_admin(domain_name)
domain = models.Domain.query.get(domain_name) or flask.abort(404)
return flask.render_template('domain/details.html', domain=domain,
config=flask_app.config)
@app.route('/domain/genkeys/<domain_name>', methods=['GET', 'POST'])
@utils.confirmation_required("regenerate keys for {domain_name}")
@flask_login.login_required
@access.domain_admin(models.Domain, 'domain_name')
@access.confirmation_required("regenerate keys for {domain_name}")
def domain_genkeys(domain_name):
domain = utils.get_domain_admin(domain_name)
domain = models.Domain.query.get(domain_name) or flask.abort(404)
domain.generate_dkim_key()
return flask.redirect(
flask.url_for(".domain_details", domain_name=domain_name))

@ -1,24 +1,24 @@
from freeposte.admin import app, db, models, forms, utils
from freeposte.admin import app, db, models, forms, access
import os
import flask
import flask_login
import wtforms_components
@app.route('/fetch/list', methods=['GET', 'POST'], defaults={'user_email': None})
@app.route('/fetch/list/<user_email>', methods=['GET'])
@flask_login.login_required
@access.owner(models.User, 'user_email')
def fetch_list(user_email):
user = utils.get_user(user_email)
user_email = user_email or flask_login.current_user.email
user = models.User.query.get(user_email) or flask.abort(404)
return flask.render_template('fetch/list.html', user=user)
@app.route('/fetch/create', methods=['GET', 'POST'], defaults={'user_email': None})
@app.route('/fetch/create/<user_email>', methods=['GET', 'POST'])
@flask_login.login_required
@access.owner(models.User, 'user_email')
def fetch_create(user_email):
user = utils.get_user(user_email)
user_email = user_email or flask_login.current_user.email
user = models.User.query.get(user_email) or flask.abort(404)
form = forms.FetchForm()
if form.validate_on_submit():
fetch = models.Fetch(user=user)
@ -32,9 +32,9 @@ def fetch_create(user_email):
@app.route('/fetch/edit/<fetch_id>', methods=['GET', 'POST'])
@flask_login.login_required
@access.owner(models.Fetch, 'fetch_id')
def fetch_edit(fetch_id):
fetch = utils.get_fetch(fetch_id)
fetch = models.Fetch.query.get(fetch_id) or flask.abort(404)
form = forms.FetchForm(obj=fetch)
if form.validate_on_submit():
form.populate_obj(fetch)
@ -47,10 +47,10 @@ def fetch_edit(fetch_id):
@app.route('/fetch/delete/<fetch_id>', methods=['GET', 'POST'])
@utils.confirmation_required("delete a fetched account")
@flask_login.login_required
@access.confirmation_required("delete a fetched account")
@access.owner(models.Fetch, 'fetch_id')
def fetch_delete(fetch_id):
fetch = utils.get_fetch(fetch_id)
fetch = models.Fetch.query.get(fetch_id) or flask.abort(404)
db.session.delete(fetch)
db.session.commit()
flask.flash('Fetch configuration delete')

@ -1,30 +1,31 @@
from freeposte.admin import app, db, models, forms, utils
from freeposte.admin import app, db, models, forms, access
import os
import flask
import flask_login
import wtforms_components
@app.route('/manager/list/<domain_name>', methods=['GET'])
@flask_login.login_required
@access.domain_admin(models.Domain, 'domain_name')
def manager_list(domain_name):
domain = utils.get_domain_admin(domain_name)
domain = models.Domain.query.get(domain_name) or flask.abort(404)
return flask.render_template('manager/list.html', domain=domain)
@app.route('/manager/create/<domain_name>', methods=['GET', 'POST'])
@flask_login.login_required
@access.domain_admin(models.Domain, 'domain_name')
def manager_create(domain_name):
domain = utils.get_domain_admin(domain_name)
domain = models.Domain.query.get(domain_name) or flask.abort(404)
form = forms.ManagerForm()
available_users = flask_login.current_user.get_managed_emails(
include_aliases=False)
form.manager.choices = [
(user.email, user.email) for user in
flask_login.current_user.get_managed_emails(include_aliases=False)
(user.email, user.email) for user in available_users
]
if form.validate_on_submit():
user = utils.get_user(form.manager.data, admin=True)
if user in domain.managers:
user = models.User.query.get(form.manager.data)
if user not in available_users:
flask.abort(403)
elif user in domain.managers:
flask.flash('User %s is already manager' % user, 'error')
else:
domain.managers.append(user)
@ -36,12 +37,12 @@ def manager_create(domain_name):
domain=domain, form=form)
@app.route('/manager/delete/<manager>', methods=['GET', 'POST'])
@utils.confirmation_required("remove manager {manager}")
@flask_login.login_required
def manager_delete(manager):
user = utils.get_user(manager, admin=True)
domain = utils.get_domain_admin(user.domain_name)
@app.route('/manager/delete/<domain_name>/<user_email>', methods=['GET', 'POST'])
@access.confirmation_required("remove manager {user_email}")
@access.domain_admin(models.Domain, 'domain_name')
def manager_delete(domain_name, user_email):
domain = models.Domain.query.get(domain_name) or flask.abort(404)
user = models.User.query.get(user_email) or flask.abort(404)
if user in domain.managers:
domain.managers.remove(user)
db.session.commit()
@ -49,4 +50,4 @@ def manager_delete(manager):
else:
flask.flash('User %s is not manager' % user, 'error')
return flask.redirect(
flask.url_for('.manager_list', domain_name=domain.name))
flask.url_for('.manager_list', domain_name=domain_name))

@ -1,22 +1,21 @@
from freeposte.admin import app, db, models, forms, utils
from freeposte.admin import app, db, models, forms, access
import os
import flask
import flask_login
import wtforms_components
@app.route('/user/list/<domain_name>', methods=['GET'])
@flask_login.login_required
@access.domain_admin(models.Domain, 'domain_name')
def user_list(domain_name):
domain = utils.get_domain_admin(domain_name)
domain = models.Domain.query.get(domain_name) or flask.abort(404)
return flask.render_template('user/list.html', domain=domain)
@app.route('/user/create/<domain_name>', methods=['GET', 'POST'])
@flask_login.login_required
@access.domain_admin(models.Domain, 'domain_name')
def user_create(domain_name):
domain = utils.get_domain_admin(domain_name)
domain = models.Domain.query.get(domain_name) or flask.abort(404)
if domain.max_users and len(domain.users) >= domain.max_users:
flask.flash('Too many users for domain %s' % domain, 'error')
return flask.redirect(
@ -39,9 +38,9 @@ def user_create(domain_name):
@app.route('/user/edit/<user_email>', methods=['GET', 'POST'])
@flask_login.login_required
@access.domain_admin(models.User, 'user_email')
def user_edit(user_email):
user = utils.get_user(user_email, True)
user = models.User.query.get(user_email) or flask.abort(404)
form = forms.UserForm(obj=user)
wtforms_components.read_only(form.localpart)
form.pw.validators = []
@ -57,10 +56,10 @@ def user_edit(user_email):
@app.route('/user/delete/<user_email>', methods=['GET', 'POST'])
@utils.confirmation_required("delete {user_email}")
@flask_login.login_required
@access.domain_admin(models.User, 'user_email')
@access.confirmation_required("delete {user_email}")
def user_delete(user_email):
user = utils.get_user(user_email, True)
user = models.User.query.get(user_email) or flask.abort(404)
db.session.delete(user)
db.session.commit()
flask.flash('User %s deleted' % user)
@ -70,9 +69,10 @@ def user_delete(user_email):
@app.route('/user/settings', methods=['GET', 'POST'], defaults={'user_email': None})
@app.route('/user/usersettings/<user_email>', methods=['GET', 'POST'])
@flask_login.login_required
@access.owner(models.User, 'user_email')
def user_settings(user_email):
user = utils.get_user(user_email)
user_email = user_email or flask_login.current_user.email
user = models.User.query.get(user_email) or flask.abort(404)
form = forms.UserSettingsForm(obj=user)
if form.validate_on_submit():
form.populate_obj(user)
@ -86,9 +86,10 @@ def user_settings(user_email):
@app.route('/user/password', methods=['GET', 'POST'], defaults={'user_email': None})
@app.route('/user/password/<user_email>', methods=['GET', 'POST'])
@flask_login.login_required
@access.owner(models.User, 'user_email')
def user_password(user_email):
user = utils.get_user(user_email)
user_email = user_email or flask_login.current_user.email
user = models.User.query.get(user_email) or flask.abort(404)
form = forms.UserPasswordForm()
if form.validate_on_submit():
if form.pw.data != form.pw2.data:
@ -105,9 +106,10 @@ def user_password(user_email):
@app.route('/user/forward', methods=['GET', 'POST'], defaults={'user_email': None})
@app.route('/user/forward/<user_email>', methods=['GET', 'POST'])
@flask_login.login_required
@access.owner(models.User, 'user_email')
def user_forward(user_email):
user = utils.get_user(user_email)
user_email = user_email or flask_login.current_user.email
user = models.User.query.get(user_email) or flask.abort(404)
form = forms.UserForwardForm(obj=user)
if form.validate_on_submit():
form.populate_obj(user)
@ -121,9 +123,10 @@ def user_forward(user_email):
@app.route('/user/reply', methods=['GET', 'POST'], defaults={'user_email': None})
@app.route('/user/reply/<user_email>', methods=['GET', 'POST'])
@flask_login.login_required
@access.owner(models.User, 'user_email')
def user_reply(user_email):
user = utils.get_user(user_email)
user_email = user_email or flask_login.current_user.email
user = models.User.query.get(user_email) or flask.abort(404)
form = forms.UserReplyForm(obj=user)
if form.validate_on_submit():
form.populate_obj(user)

@ -10,3 +10,4 @@ PyOpenSSL
passlib
gunicorn
docker-py
tabulate

Loading…
Cancel
Save