You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

94 lines
3.8 KiB

from mailu import utils
from flask import current_app as app
import base64
import limits
import limits.strategies
import hmac
import secrets
class LimitWrapper(object):
""" Wraps a limit by providing the storage, item and identifiers
def __init__(self, limiter, limit, *identifiers):
self.limiter = limiter
self.limit = limit
self.base_identifiers = identifiers
def test(self, *args):
return self.limiter.test(self.limit, *(self.base_identifiers + args))
def hit(self, *args):
return self.limiter.hit(self.limit, *(self.base_identifiers + args))
def get_window_stats(self, *args):
return self.limiter.get_window_stats(self.limit, *(self.base_identifiers + args))
class LimitWraperFactory(object):
""" Global limiter, to be used as a factory
def init_app(self, app): =["RATELIMIT_STORAGE_URL"])
self.limiter = limits.strategies.MovingWindowRateLimiter(
def get_limiter(self, limit, *args):
return LimitWrapper(self.limiter, limits.parse(limit), *args)
def is_subject_to_rate_limits(self, ip):
3 years ago
return False if utils.is_exempt_from_ratelimits(ip) else not ('exempt-{ip}') > 0)
def exempt_ip_from_ratelimits(self, ip):'exempt-{ip}', app.config["AUTH_RATELIMIT_EXEMPTION_LENGTH"], True)
def should_rate_limit_ip(self, ip):
limiter = self.get_limiter(app.config["AUTH_RATELIMIT_IP"], 'auth-ip')
client_network = utils.extract_network_from_ip(ip)
is_rate_limited = self.is_subject_to_rate_limits(ip) and not limiter.test(client_network)
if is_rate_limited:
app.logger.warn(f'Authentication attempt from {ip} has been rate-limited.')
return is_rate_limited
def rate_limit_ip(self, ip):
limiter = self.get_limiter(app.config["AUTH_RATELIMIT_IP"], 'auth-ip')
client_network = utils.extract_network_from_ip(ip)
if self.is_subject_to_rate_limits(ip):
def should_rate_limit_user(self, username, ip, device_cookie=None, device_cookie_name=None):
limiter = self.get_limiter(app.config["AUTH_RATELIMIT_USER"], 'auth-user')
is_rate_limited = self.is_subject_to_rate_limits(ip) and not limiter.test(device_cookie if device_cookie_name == username else username)
if is_rate_limited:
app.logger.warn(f'Authentication attempt from {ip} for {username} has been rate-limited.')
return is_rate_limited
def rate_limit_user(self, username, ip, device_cookie=None, device_cookie_name=None):
limiter = self.get_limiter(app.config["AUTH_RATELIMIT_USER"], 'auth-user')
if self.is_subject_to_rate_limits(ip):
limiter.hit(device_cookie if device_cookie_name == username else username)
""" Device cookies as described on:
def parse_device_cookie(self, cookie):
login, nonce, _ = cookie.split('$')
if hmac.compare_digest(cookie, self.device_cookie(login, nonce)):
return nonce, login
return None, None
""" Device cookies don't require strong crypto:
72bits of nonce, 96bits of signature is more than enough
and these values avoid padding in most cases
def device_cookie(self, username, nonce=None):
if not nonce:
nonce = secrets.token_urlsafe(9)
sig = str(base64.urlsafe_b64encode(, bytearray(f'device_cookie|{username}|{nonce}', 'utf-8'), 'sha256').digest()[20:]), 'utf-8')
return f'{username}${nonce}${sig}'