Split the internal blueprint into multiple view files

master
kaiyou 6 years ago
parent dc4b0d21ea
commit 42c6bdb4df

@ -8,6 +8,7 @@ import flask
internal = flask.Blueprint('internal', __name__, template_folder='templates') internal = flask.Blueprint('internal', __name__, template_folder='templates')
@internal.app_errorhandler(RateLimitExceeded) @internal.app_errorhandler(RateLimitExceeded)
def rate_limit_handler(e): def rate_limit_handler(e):
response = flask.Response() response = flask.Response()
@ -17,6 +18,7 @@ def rate_limit_handler(e):
response.headers['Auth-Wait'] = '3' response.headers['Auth-Wait'] = '3'
return response return response
@limiter.request_filter @limiter.request_filter
def whitelist_webmail(): def whitelist_webmail():
try: try:
@ -26,4 +28,4 @@ def whitelist_webmail():
return False return False
from mailu.internal import views from mailu.internal.views import *

@ -1,158 +0,0 @@
from mailu import db, models, app, limiter
from mailu.internal import internal, nginx
from sqlalchemy import or_
import flask
import flask_login
import base64
import urllib
import datetime
@internal.route("/auth/email")
@limiter.limit(
app.config["AUTH_RATELIMIT"],
lambda: flask.request.headers["Client-Ip"]
)
def nginx_authentication():
""" Main authentication endpoint for Nginx email server
"""
headers = nginx.handle_authentication(flask.request.headers)
response = flask.Response()
for key, value in headers.items():
response.headers[key] = str(value)
return response
@internal.route("/auth/admin")
def admin_authentication():
""" Fails if the user is not an authenticated admin.
"""
if (not flask_login.current_user.is_anonymous
and flask_login.current_user.global_admin
and flask_login.current_user.enabled):
return ""
return flask.abort(403)
@internal.route("/auth/basic")
def basic_authentication():
""" Tries to authenticate using the Authorization header.
"""
authorization = flask.request.headers.get("Authorization")
if authorization and authorization.startswith("Basic "):
encoded = authorization.replace("Basic ", "")
user_email, password = base64.b64decode(encoded).split(b":")
user = models.User.query.get(user_email.decode("utf8"))
if user and user.enabled and user.check_password(password.decode("utf8")):
response = flask.Response()
response.headers["X-User"] = user.email
return response
response = flask.Response(status=401)
response.headers["WWW-Authenticate"] = 'Basic realm="Login Required"'
return response
@internal.route("/postfix/domain/<domain_name>")
def postfix_mailbox_domain(domain_name):
domain = models.Domain.query.get(domain_name) or flask.abort(404)
return flask.jsonify(domain.name)
@internal.route("/postfix/mailbox/<email>")
def postfix_mailbox_map(email):
user = models.User.query.get(email) or flask.abort(404)
return flask.jsonify(user.email)
@internal.route("/postfix/alias/<alias>")
def postfix_alias_map(alias):
localpart, domain = alias.split('@', 1) if '@' in alias else (None, alias)
alternative = models.Alternative.query.get(domain)
if alternative:
domain = alternative.domain_name
email = '{}@{}'.format(localpart, domain)
if localpart is None:
return flask.jsonify(domain)
else:
alias_obj = models.Alias.resolve(localpart, domain)
if alias_obj:
return flask.jsonify(",".join(alias_obj.destination))
user_obj = models.User.query.get(email)
if user_obj:
return flask.jsonify(user_obj.destination)
return flask.abort(404)
@internal.route("/postfix/spoofed/<email>")
def postfix_spoofed(email):
return flask.abort(404)
@internal.route("/postfix/transport/<email>")
def postfix_transport(email):
return flask.abort(404)
@internal.route("/dovecot/passdb/<user_email>")
def dovecot_passdb_dict(user_email):
user = models.User.query.get(user_email) or flask.abort(404)
return flask.jsonify({
"password": user.password,
})
@internal.route("/dovecot/userdb/<user_email>")
def dovecot_userdb_dict(user_email):
user = models.User.query.get(user_email) or flask.abort(404)
return flask.jsonify({
"quota_rule": "*:bytes={}".format(user.quota_bytes)
})
@internal.route("/dovecot/quota/<ns>/<user_email>", methods=["POST"])
def dovecot_quota(ns, user_email):
user = models.User.query.get(user_email) or flask.abort(404)
if ns == "storage":
user.quota_bytes_used = flask.request.get_json()
db.session.commit()
return flask.jsonify(None)
@internal.route("/dovecot/sieve/name/<script>/<user_email>")
def dovecot_sieve_name(script, user_email):
return flask.jsonify(script)
@internal.route("/dovecot/sieve/data/default/<user_email>")
def dovecot_sieve_data(user_email):
user = models.User.query.get(user_email) or flask.abort(404)
return flask.jsonify(flask.render_template("default.sieve", user=user))
@internal.route("/fetch")
def fetch_list():
return flask.jsonify([
{
"id": fetch.id,
"tls": fetch.tls,
"keep": fetch.keep,
"user_email": fetch.user_email,
"protocol": fetch.protocol,
"host": fetch.host,
"port": fetch.port,
"username": fetch.username,
"password": fetch.password
} for fetch in models.Fetch.query.all()
])
@internal.route("/fetch/<fetch_id>", methods=["POST"])
def fetch_done(fetch_id):
fetch = models.Fetch.query.get(fetch_id) or flask.abort(404)
fetch.last_check = datetime.datetime.now()
fetch.error_message = str(flask.request.get_json())
db.session.add(fetch)
db.session.commit()
return ""

@ -0,0 +1,3 @@
__all__ = [
'auth', 'postfix', 'dovecot', 'fetch'
]

@ -0,0 +1,50 @@
from mailu import db, models, app, limiter
from mailu.internal import internal, nginx
import flask
import flask_login
import base64
@internal.route("/auth/email")
@limiter.limit(
app.config["AUTH_RATELIMIT"],
lambda: flask.request.headers["Client-Ip"]
)
def nginx_authentication():
""" Main authentication endpoint for Nginx email server
"""
headers = nginx.handle_authentication(flask.request.headers)
response = flask.Response()
for key, value in headers.items():
response.headers[key] = str(value)
return response
@internal.route("/auth/admin")
def admin_authentication():
""" Fails if the user is not an authenticated admin.
"""
if (not flask_login.current_user.is_anonymous
and flask_login.current_user.global_admin
and flask_login.current_user.enabled):
return ""
return flask.abort(403)
@internal.route("/auth/basic")
def basic_authentication():
""" Tries to authenticate using the Authorization header.
"""
authorization = flask.request.headers.get("Authorization")
if authorization and authorization.startswith("Basic "):
encoded = authorization.replace("Basic ", "")
user_email, password = base64.b64decode(encoded).split(b":")
user = models.User.query.get(user_email.decode("utf8"))
if user and user.enabled and user.check_password(password.decode("utf8")):
response = flask.Response()
response.headers["X-User"] = user.email
return response
response = flask.Response(status=401)
response.headers["WWW-Authenticate"] = 'Basic realm="Login Required"'
return response

@ -0,0 +1,40 @@
from mailu import db, models
from mailu.internal import internal
import flask
@internal.route("/dovecot/passdb/<user_email>")
def dovecot_passdb_dict(user_email):
user = models.User.query.get(user_email) or flask.abort(404)
return flask.jsonify({
"password": user.password,
})
@internal.route("/dovecot/userdb/<user_email>")
def dovecot_userdb_dict(user_email):
user = models.User.query.get(user_email) or flask.abort(404)
return flask.jsonify({
"quota_rule": "*:bytes={}".format(user.quota_bytes)
})
@internal.route("/dovecot/quota/<ns>/<user_email>", methods=["POST"])
def dovecot_quota(ns, user_email):
user = models.User.query.get(user_email) or flask.abort(404)
if ns == "storage":
user.quota_bytes_used = flask.request.get_json()
db.session.commit()
return flask.jsonify(None)
@internal.route("/dovecot/sieve/name/<script>/<user_email>")
def dovecot_sieve_name(script, user_email):
return flask.jsonify(script)
@internal.route("/dovecot/sieve/data/default/<user_email>")
def dovecot_sieve_data(user_email):
user = models.User.query.get(user_email) or flask.abort(404)
return flask.jsonify(flask.render_template("default.sieve", user=user))

@ -0,0 +1,32 @@
from mailu import db, models
from mailu.internal import internal
import flask
import datetime
@internal.route("/fetch")
def fetch_list():
return flask.jsonify([
{
"id": fetch.id,
"tls": fetch.tls,
"keep": fetch.keep,
"user_email": fetch.user_email,
"protocol": fetch.protocol,
"host": fetch.host,
"port": fetch.port,
"username": fetch.username,
"password": fetch.password
} for fetch in models.Fetch.query.all()
])
@internal.route("/fetch/<fetch_id>", methods=["POST"])
def fetch_done(fetch_id):
fetch = models.Fetch.query.get(fetch_id) or flask.abort(404)
fetch.last_check = datetime.datetime.now()
fetch.error_message = str(flask.request.get_json())
db.session.add(fetch)
db.session.commit()
return ""

@ -0,0 +1,45 @@
from mailu import db, models
from mailu.internal import internal
import flask
@internal.route("/postfix/domain/<domain_name>")
def postfix_mailbox_domain(domain_name):
domain = models.Domain.query.get(domain_name) or flask.abort(404)
return flask.jsonify(domain.name)
@internal.route("/postfix/mailbox/<email>")
def postfix_mailbox_map(email):
user = models.User.query.get(email) or flask.abort(404)
return flask.jsonify(user.email)
@internal.route("/postfix/alias/<alias>")
def postfix_alias_map(alias):
localpart, domain = alias.split('@', 1) if '@' in alias else (None, alias)
alternative = models.Alternative.query.get(domain)
if alternative:
domain = alternative.domain_name
email = '{}@{}'.format(localpart, domain)
if localpart is None:
return flask.jsonify(domain)
else:
alias_obj = models.Alias.resolve(localpart, domain)
if alias_obj:
return flask.jsonify(",".join(alias_obj.destination))
user_obj = models.User.query.get(email)
if user_obj:
return flask.jsonify(user_obj.destination)
return flask.abort(404)
@internal.route("/postfix/spoofed/<email>")
def postfix_spoofed(email):
return flask.abort(404)
@internal.route("/postfix/transport/<email>")
def postfix_transport(email):
return flask.abort(404)
Loading…
Cancel
Save