Refactor auth under nginx.check_credentials()

master
Florent Daigniere 4 years ago
parent f9ed517b39
commit df230cb482

@ -19,6 +19,20 @@ STATUSES = {
}), }),
} }
def check_credentials(user, password, ip, protocol=None):
if not user or not user.enabled or (protocol == "imap" and not user.enable_imap) or (protocol == "pop3" and not user.enable_pop):
return False
is_ok = False
# All tokens are 32 characters hex lowercase
if len(password) == 32:
for token in user.tokens:
if (token.check_password(password) and
(not token.ip or token.ip == ip)):
is_ok = True
break
if not is_ok and user.check_password(password):
is_ok = True
return is_ok
def handle_authentication(headers): def handle_authentication(headers):
""" Handle an HTTP nginx authentication request """ Handle an HTTP nginx authentication request
@ -47,23 +61,7 @@ def handle_authentication(headers):
password = raw_password.encode("iso8859-1").decode("utf8") password = raw_password.encode("iso8859-1").decode("utf8")
ip = urllib.parse.unquote(headers["Client-Ip"]) ip = urllib.parse.unquote(headers["Client-Ip"])
user = models.User.query.get(user_email) user = models.User.query.get(user_email)
status = False if check_credentials(user, password, ip, protocol):
if user:
# All tokens are 32 characters hex lowercase
if len(password) == 32:
for token in user.tokens:
if (token.check_password(password) and
(not token.ip or token.ip == ip)):
status = True
break
if not status and user.check_password(password):
status = True
if status:
if protocol == "imap" and not user.enable_imap:
status = False
elif protocol == "pop3" and not user.enable_pop:
status = False
if status and user.enabled:
return { return {
"Auth-Status": "OK", "Auth-Status": "OK",
"Auth-Server": server, "Auth-Server": server,

@ -53,19 +53,7 @@ def basic_authentication():
encoded = authorization.replace("Basic ", "") encoded = authorization.replace("Basic ", "")
user_email, password = base64.b64decode(encoded).split(b":") user_email, password = base64.b64decode(encoded).split(b":")
user = models.User.query.get(user_email.decode("utf8")) user = models.User.query.get(user_email.decode("utf8"))
if user and user.enabled: if nginx.check_credentials(user, password.decode('utf-8'), flask.request.remote_addr, "web"):
password = password.decode('utf-8')
status = False
# All tokens are 32 characters hex lowercase
if len(password) == 32:
for token in user.tokens:
if (token.check_password(password) and
(not token.ip or token.ip == ip)):
status = True
break
if not status and user.check_password(password):
status = True
if status:
response = flask.Response() response = flask.Response()
response.headers["X-User"] = user.email response.headers["X-User"] = user.email
return response return response

Loading…
Cancel
Save