first try at api using flask-restx & marshmallow

master
Alexander Graf 4 years ago
parent 4c258f5a6b
commit 6629aa3ff8

@ -41,15 +41,17 @@ def create_app_from_config(config):
)
# Import views
from mailu import ui, internal
from mailu import ui, internal, api
app.register_blueprint(ui.ui, url_prefix='/ui')
app.register_blueprint(internal.internal, url_prefix='/internal')
if app.config.get('API'):
api.register(app)
return app
def create_app():
""" Create a new application based on the config module
""" Create a new application based on the config module
"""
config = configuration.ConfigManager()
return create_app_from_config(config)

@ -0,0 +1,38 @@
from flask import redirect, url_for
# import api version(s)
from . import v1
# api
ROOT='/api'
ACTIVE=v1
# patch url for swaggerui static assets
from flask_restx.apidoc import apidoc
apidoc.static_url_path = f'{ROOT}/swaggerui'
def register(app):
# register api bluprint(s)
app.register_blueprint(v1.blueprint, url_prefix=f'{ROOT}/v{int(v1.VERSION)}')
# add redirect to current api version
@app.route(f'{ROOT}/')
def redir():
return redirect(url_for(f'{ACTIVE.blueprint.name}.root'))
# swagger ui config
app.config.SWAGGER_UI_DOC_EXPANSION = 'list'
app.config.SWAGGER_UI_OPERATION_ID = True
app.config.SWAGGER_UI_REQUEST_DURATION = True
# TODO: remove patch of static assets for debugging
import os
if 'STATIC_ASSETS' in os.environ:
app.blueprints['ui'].static_folder = os.environ['STATIC_ASSETS']
# TODO: authentication via username + password
# TODO: authentication via api token
# TODO: api access for all users (via token)
# TODO: use permissions from "manager_of"
# TODO: switch to marshmallow, as parser is deprecated. use flask_accepts?

@ -0,0 +1,8 @@
from .. import models
def fqdn_in_use(*names):
for name in names:
for model in models.Domain, models.Alternative, models.Relay:
if model.query.get(name):
return model
return None

@ -0,0 +1,27 @@
from flask import Blueprint
from flask_restx import Api, fields
VERSION = 1.0
blueprint = Blueprint(f'api_v{int(VERSION)}', __name__)
api = Api(
blueprint, version=f'{VERSION:.1f}',
title='Mailu API', default_label='Mailu',
validate=True
)
response_fields = api.model('Response', {
'code': fields.Integer,
'message': fields.String,
})
error_fields = api.model('Error', {
'errors': fields.Nested(api.model('Error_Key', {
'key': fields.String,
'message':fields.String
})),
'message': fields.String,
})
from . import domains

@ -0,0 +1,183 @@
from flask_restx import Resource, fields, abort
from . import api, response_fields, error_fields
from .. import common
from ... import models
db = models.db
dom = api.namespace('domain', description='Domain operations')
alt = api.namespace('alternative', description='Alternative operations')
domain_fields = api.model('Domain', {
'name': fields.String(description='FQDN', example='example.com', required=True),
'comment': fields.String(description='a comment'),
'max_users': fields.Integer(description='maximum number of users', min=-1, default=-1),
'max_aliases': fields.Integer(description='maximum number of aliases', min=-1, default=-1),
'max_quota_bytes': fields.Integer(description='maximum quota for mailbox', min=0),
'signup_enabled': fields.Boolean(description='allow signup'),
# 'dkim_key': fields.String,
'alternatives': fields.List(fields.String(attribute='name', description='FQDN', example='example.com')),
})
# TODO - name ist required on creation but immutable on change
# TODO - name and alteranatives need to be checked to be a fqdn (regex)
domain_parser = api.parser()
domain_parser.add_argument('max_users', type=int, help='maximum number of users')
# TODO ... add more (or use marshmallow)
alternative_fields = api.model('Domain', {
'name': fields.String(description='alternative FQDN', example='example.com', required=True),
'domain': fields.String(description='domain FQDN', example='example.com', required=True),
'dkim_key': fields.String,
})
# TODO: domain and name are not always required and can't be changed
@dom.route('')
class Domains(Resource):
@dom.doc('list_domain')
@dom.marshal_with(domain_fields, as_list=True, skip_none=True, mask=['dkim_key'])
def get(self):
""" List domains """
return models.Domain.query.all()
@dom.doc('create_domain')
@dom.expect(domain_fields)
@dom.response(200, 'Success', response_fields)
@dom.response(400, 'Input validation exception', error_fields)
@dom.response(409, 'Duplicate domain name', error_fields)
def post(self):
""" Create a new domain """
data = api.payload
if common.fqdn_in_use(data['name']):
abort(409, f'Duplicate domain name {data["name"]!r}', errors={
'name': data['name'],
})
for item, created in models.Domain.from_dict(data):
if not created:
abort(409, f'Duplicate domain name {item.name!r}', errors={
'alternatives': item.name,
})
db.session.add(item)
db.session.commit()
@dom.route('/<name>')
class Domain(Resource):
@dom.doc('get_domain')
@dom.response(200, 'Success', domain_fields)
@dom.response(404, 'Domain not found')
@dom.marshal_with(domain_fields)
def get(self, name):
""" Find domain by name """
domain = models.Domain.query.get(name)
if not domain:
abort(404)
return domain
@dom.doc('update_domain')
@dom.expect(domain_fields)
@dom.response(200, 'Success', response_fields)
@dom.response(400, 'Input validation exception', error_fields)
@dom.response(404, 'Domain not found')
def put(self, name):
""" Update an existing domain """
domain = models.Domain.query.get(name)
if not domain:
abort(404)
data = api.payload
data['name'] = name
for item, created in models.Domain.from_dict(data):
if created is True:
db.session.add(item)
db.session.commit()
@dom.doc('modify_domain')
@dom.expect(domain_parser)
@dom.response(200, 'Success', response_fields)
@dom.response(400, 'Input validation exception', error_fields)
@dom.response(404, 'Domain not found')
def post(self, name=None):
""" Updates domain with form data """
domain = models.Domain.query.get(name)
if not domain:
abort(404)
data = dict(domain_parser.parse_args())
data['name'] = name
for item, created in models.Domain.from_dict(data):
if created is True:
db.session.add(item)
# TODO: flush?
db.session.commit()
@dom.doc('delete_domain')
@dom.response(200, 'Success', response_fields)
@dom.response(404, 'Domain not found')
def delete(self, name=None):
""" Delete domain """
domain = models.Domain.query.get(name)
if not domain:
abort(404)
db.session.delete(domain)
db.session.commit()
# @dom.route('/<name>/alternative')
# @alt.route('')
# class Alternatives(Resource):
# @alt.doc('alternatives_list')
# @alt.marshal_with(alternative_fields, as_list=True, skip_none=True, mask=['dkim_key'])
# def get(self, name=None):
# """ List alternatives (of domain) """
# if name is None:
# return models.Alternative.query.all()
# else:
# return models.Alternative.query.filter_by(domain_name = name).all()
# @alt.doc('alternative_create')
# @alt.expect(alternative_fields)
# @alt.response(200, 'Success', response_fields)
# @alt.response(400, 'Input validation exception', error_fields)
# @alt.response(404, 'Domain not found')
# @alt.response(409, 'Duplicate domain name', error_fields)
# def post(self, name=None):
# """ Create new alternative (for domain) """
# # abort(501)
# data = api.payload
# if name is not None:
# data['name'] = name
# domain = models.Domain.query.get(name)
# if not domain:
# abort(404)
# if common.fqdn_in_use(data['name']):
# abort(409, f'Duplicate domain name {data["name"]!r}', errors={
# 'name': data['name'],
# })
# for item, created in models.Alternative.from_dict(data):
# # TODO: handle creation of domain
# if not created:
# abort(409, f'Duplicate domain name {item.name!r}', errors={
# 'alternatives': item.name,
# })
# # db.session.add(item)
# # db.session.commit()
# @dom.route('/<name>/alternative/<alt>')
# @alt.route('/<name>')
# class Alternative(Resource):
# def get(self, name, alt=None):
# """ Find alternative (of domain) """
# abort(501)
# def put(self, name, alt=None):
# """ Update alternative (of domain) """
# abort(501)
# def post(self, name, alt=None):
# """ Update alternative (of domain) with form data """
# abort(501)
# def delete(self, name, alt=None):
# """ Delete alternative (for domain) """
# abort(501)

@ -1,4 +1,5 @@
from mailu import models
from .schemas import schemas
from flask import current_app as app
from flask.cli import FlaskGroup, with_appcontext
@ -320,24 +321,30 @@ def config_dump(full=False, secrets=False, dns=False, sections=None):
return super().increase_indent(flow, False)
if sections:
check = dict(yaml_sections)
for section in sections:
if section not in check:
if section not in schemas:
print(f'[ERROR] Invalid section: {section}')
return 1
else:
sections = sorted(schemas.keys())
extra = []
if dns:
extra.append('dns')
# TODO: create World Schema and dump only this with Word.dumps ?
config = {}
for section, model in yaml_sections:
if not sections or section in sections:
dump = [item.to_dict(full, secrets, extra) for item in model.query.all()]
if len(dump):
config[section] = dump
yaml.dump(config, sys.stdout, Dumper=spacedDumper, default_flow_style=False, allow_unicode=True)
for section in sections:
schema = schemas[section](many=True)
schema.context.update({
'full': full,
'secrets': secrets,
'dns': dns,
})
yaml.dump(
{section: schema.dump(schema.Meta.model.query.all())},
sys.stdout,
Dumper=spacedDumper,
default_flow_style=False,
allow_unicode=True
)
sys.stdout.write('\n')
@mailu.command()

@ -0,0 +1,222 @@
import marshmallow
import flask_marshmallow
from . import models
ma = flask_marshmallow.Marshmallow()
class BaseSchema(ma.SQLAlchemyAutoSchema):
SKIP_IF = {
'comment': {'', None}
}
@marshmallow.post_dump
def remove_skip_values(self, data, many, **kwargs):
print(repr(data), self.context)
return {
key: value for key, value in data.items()
if key not in self.SKIP_IF or value not in self.SKIP_IF[key]
}
class BaseMeta:
exclude = ['created_at', 'updated_at']
class DomainSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = models.Domain
# _dict_hide = {'users', 'managers', 'aliases'}
# _dict_show = {'dkim_key'}
# _dict_extra = {'dns':{'dkim_publickey', 'dns_mx', 'dns_spf', 'dns_dkim', 'dns_dmarc'}}
# _dict_secret = {'dkim_key'}
# _dict_types = {
# 'dkim_key': (bytes, type(None)),
# 'dkim_publickey': False,
# 'dns_mx': False,
# 'dns_spf': False,
# 'dns_dkim': False,
# 'dns_dmarc': False,
# }
# _dict_output = {'dkim_key': lambda key: key.decode('utf-8').strip().split('\n')[1:-1]}
# @staticmethod
# def _dict_input(data):
# if 'dkim_key' in data:
# key = data['dkim_key']
# if key is not None:
# if type(key) is list:
# key = ''.join(key)
# if type(key) is str:
# key = ''.join(key.strip().split()) # removes all whitespace
# if key == 'generate':
# data['dkim_key'] = dkim.gen_key()
# elif key:
# m = re.match('^-----BEGIN (RSA )?PRIVATE KEY-----', key)
# if m is not None:
# key = key[m.end():]
# m = re.search('-----END (RSA )?PRIVATE KEY-----$', key)
# if m is not None:
# key = key[:m.start()]
# key = '\n'.join(wrap(key, 64))
# key = f'-----BEGIN PRIVATE KEY-----\n{key}\n-----END PRIVATE KEY-----\n'.encode('ascii')
# try:
# dkim.strip_key(key)
# except:
# raise ValueError('invalid dkim key')
# else:
# data['dkim_key'] = key
# else:
# data['dkim_key'] = None
# name = db.Column(IdnaDomain, primary_key=True, nullable=False)
# managers = db.relationship('User', secondary=managers,
# backref=db.backref('manager_of'), lazy='dynamic')
# max_users = db.Column(db.Integer, nullable=False, default=-1)
# max_aliases = db.Column(db.Integer, nullable=False, default=-1)
# max_quota_bytes = db.Column(db.BigInteger(), nullable=False, default=0)
# signup_enabled = db.Column(db.Boolean(), nullable=False, default=False)
class UserSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = models.User
# _dict_hide = {'domain_name', 'domain', 'localpart', 'quota_bytes_used'}
# _dict_mandatory = {'localpart', 'domain', 'password'}
# @classmethod
# def _dict_input(cls, data):
# Email._dict_input(data)
# # handle password
# if 'password' in data:
# if 'password_hash' in data or 'hash_scheme' in data:
# raise ValueError('ambigous key password and password_hash/hash_scheme')
# # check (hashed) password
# password = data['password']
# if password.startswith('{') and '}' in password:
# scheme = password[1:password.index('}')]
# if scheme not in cls.scheme_dict:
# raise ValueError(f'invalid password scheme {scheme!r}')
# else:
# raise ValueError(f'invalid hashed password {password!r}')
# elif 'password_hash' in data and 'hash_scheme' in data:
# if data['hash_scheme'] not in cls.scheme_dict:
# raise ValueError(f'invalid password scheme {scheme!r}')
# data['password'] = '{'+data['hash_scheme']+'}'+ data['password_hash']
# del data['hash_scheme']
# del data['password_hash']
# domain = db.relationship(Domain,
# backref=db.backref('users', cascade='all, delete-orphan'))
# password = db.Column(db.String(255), nullable=False)
# quota_bytes = db.Column(db.BigInteger(), nullable=False, default=10**9)
# quota_bytes_used = db.Column(db.BigInteger(), nullable=False, default=0)
# global_admin = db.Column(db.Boolean(), nullable=False, default=False)
# enabled = db.Column(db.Boolean(), nullable=False, default=True)
# # Features
# enable_imap = db.Column(db.Boolean(), nullable=False, default=True)
# enable_pop = db.Column(db.Boolean(), nullable=False, default=True)
# # Filters
# forward_enabled = db.Column(db.Boolean(), nullable=False, default=False)
# forward_destination = db.Column(CommaSeparatedList(), nullable=True, default=[])
# forward_keep = db.Column(db.Boolean(), nullable=False, default=True)
# reply_enabled = db.Column(db.Boolean(), nullable=False, default=False)
# reply_subject = db.Column(db.String(255), nullable=True, default=None)
# reply_body = db.Column(db.Text(), nullable=True, default=None)
# reply_startdate = db.Column(db.Date, nullable=False,
# default=date(1900, 1, 1))
# reply_enddate = db.Column(db.Date, nullable=False,
# default=date(2999, 12, 31))
# # Settings
# displayed_name = db.Column(db.String(160), nullable=False, default='')
# spam_enabled = db.Column(db.Boolean(), nullable=False, default=True)
# spam_threshold = db.Column(db.Integer(), nullable=False, default=80)
class AliasSchema(BaseSchema):
class Meta(BaseMeta):
model = models.Alias
exclude = BaseMeta.exclude + ['localpart']
# TODO look for good way to exclude secrets, unverbose and defaults
# _dict_hide = {'domain_name', 'domain', 'localpart'}
# @staticmethod
# def _dict_input(data):
# Email._dict_input(data)
# # handle comma delimited string for backwards compability
# dst = data.get('destination')
# if type(dst) is str:
# data['destination'] = list([adr.strip() for adr in dst.split(',')])
class TokenSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = models.Token
# _dict_recurse = True
# _dict_hide = {'user', 'user_email'}
# _dict_mandatory = {'password'}
# id = db.Column(db.Integer(), primary_key=True)
# user_email = db.Column(db.String(255), db.ForeignKey(User.email),
# nullable=False)
# user = db.relationship(User,
# backref=db.backref('tokens', cascade='all, delete-orphan'))
# password = db.Column(db.String(255), nullable=False)
# ip = db.Column(db.String(255))
class FetchSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = models.Fetch
# _dict_recurse = True
# _dict_hide = {'user_email', 'user', 'last_check', 'error'}
# _dict_mandatory = {'protocol', 'host', 'port', 'username', 'password'}
# _dict_secret = {'password'}
# id = db.Column(db.Integer(), primary_key=True)
# user_email = db.Column(db.String(255), db.ForeignKey(User.email),
# nullable=False)
# user = db.relationship(User,
# backref=db.backref('fetches', cascade='all, delete-orphan'))
# protocol = db.Column(db.Enum('imap', 'pop3'), nullable=False)
# host = db.Column(db.String(255), nullable=False)
# port = db.Column(db.Integer(), nullable=False)
# tls = db.Column(db.Boolean(), nullable=False, default=False)
# username = db.Column(db.String(255), nullable=False)
# password = db.Column(db.String(255), nullable=False)
# keep = db.Column(db.Boolean(), nullable=False, default=False)
# last_check = db.Column(db.DateTime, nullable=True)
# error = db.Column(db.String(1023), nullable=True)
class ConfigSchema(ma.SQLAlchemySchema):
class Meta:
model = models.Config
# created_at = ma.auto_field(dump_only=True)
# updated_at = ma.auto_field(dump_only=True)
comment = ma.auto_field()
name = ma.auto_field(required=True)
value = ma.auto_field(required=True)
class RelaySchema(BaseSchema):
class Meta(BaseMeta):
model = models.Relay
# created_at = ma.auto_field(dump_only=True)
# updated_at = ma.auto_field(dump_only=True)
# comment = ma.auto_field()
# name = ma.auto_field(required=True)
# smtp = ma.auto_field(required=True)
schemas = {
'domains': DomainSchema,
'relays': RelaySchema,
'users': UserSchema,
'aliases': AliasSchema,
# 'config': ConfigSchema,
}
Loading…
Cancel
Save