Add support for dovecot dict_set operations

main
Pierre Jaury 6 years ago committed by Alexander Graf
parent eb6b1866f1
commit c5fa0280a0
No known key found for this signature in database
GPG Key ID: B8A9DC143E075629

@ -5,6 +5,7 @@ It is able to proxify postfix maps and dovecot dicts to any table
import asyncio import asyncio
import logging import logging
import sys
from podop import postfix, dovecot, table from podop import postfix, dovecot, table
@ -19,7 +20,7 @@ TABLE_TYPES = dict(
) )
def run_server(server_type, socket, tables): def run_server(verbosity, server_type, socket, tables):
""" Run the server, given its type, socket path and table list """ Run the server, given its type, socket path and table list
The table list must be a list of tuples (name, type, param) The table list must be a list of tuples (name, type, param)
@ -30,7 +31,8 @@ def run_server(server_type, socket, tables):
for name, table_type, param in tables for name, table_type, param in tables
} }
# Run the main loop # Run the main loop
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(stream=sys.stderr, level=max(3 - verbosity, 0) * 10,
format='%(name)s (%(levelname)s): %(message)s')
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
server = loop.run_until_complete(loop.create_unix_server( server = loop.run_until_complete(loop.create_unix_server(
SERVER_TYPES[server_type].factory(table_map), socket SERVER_TYPES[server_type].factory(table_map), socket

@ -3,11 +3,15 @@
import asyncio import asyncio
import logging import logging
import json
class DictProtocol(asyncio.Protocol): class DictProtocol(asyncio.Protocol):
""" Protocol to answer Dovecot dict requests, as implemented in Dict proxy. """ Protocol to answer Dovecot dict requests, as implemented in Dict proxy.
Only a subset of operations is handled properly by this proxy: hello,
lookup and transaction-based set.
There is very little documentation about the protocol, most of it was There is very little documentation about the protocol, most of it was
reverse-engineered from : reverse-engineered from :
@ -20,9 +24,15 @@ class DictProtocol(asyncio.Protocol):
def __init__(self, table_map): def __init__(self, table_map):
self.table_map = table_map self.table_map = table_map
# Minor and major versions are not properly checked yet, but stored
# anyway
self.major_version = None self.major_version = None
self.minor_version = None self.minor_version = None
# Every connection starts with specifying which table is used, dovecot
# tables are called dicts
self.dict = None self.dict = None
# Dictionary of active transaction lists per transaction id
self.transactions = {}
super(DictProtocol, self).__init__() super(DictProtocol, self).__init__()
def connection_made(self, transport): def connection_made(self, transport):
@ -32,14 +42,17 @@ class DictProtocol(asyncio.Protocol):
def data_received(self, data): def data_received(self, data):
logging.debug("Received {}".format(data)) logging.debug("Received {}".format(data))
results = [] results = []
# Every command is separated by "\n"
for line in data.split(b"\n"): for line in data.split(b"\n"):
logging.debug("Line {}".format(line)) # A command must at list have a type and one argument
if len(line) < 2: if len(line) < 2:
continue continue
# The command function will handle the command itself
command = DictProtocol.COMMANDS.get(line[0]) command = DictProtocol.COMMANDS.get(line[0])
if command is None: if command is None:
logging.warning('Unknown command {}'.format(line[0])) logging.warning('Unknown command {}'.format(line[0]))
return self.transport.abort() return self.transport.abort()
# Args are separated by "\t"
args = line[1:].strip().split(b"\t") args = line[1:].strip().split(b"\t")
try: try:
future = command(self, *args) future = command(self, *args)
@ -48,22 +61,30 @@ class DictProtocol(asyncio.Protocol):
except Exception: except Exception:
logging.exception("Error when processing request") logging.exception("Error when processing request")
return self.transport.abort() return self.transport.abort()
logging.debug("Results {}".format(results)) # For asyncio consistency, wait for all results to fire before
# actually returning control
return asyncio.gather(*results) return asyncio.gather(*results)
def process_hello(self, major, minor, value_type, user, dict_name): def process_hello(self, major, minor, value_type, user, dict_name):
""" Process a dict protocol hello message
"""
self.major, self.minor = int(major), int(minor) self.major, self.minor = int(major), int(minor)
logging.debug('Client version {}.{}'.format(self.major, self.minor))
assert self.major == 2
self.value_type = DictProtocol.DATA_TYPES[int(value_type)] self.value_type = DictProtocol.DATA_TYPES[int(value_type)]
self.user = user self.user = user.decode("utf8")
self.dict = self.table_map[dict_name.decode("ascii")] self.dict = self.table_map[dict_name.decode("ascii")]
logging.debug("Value type {}, user {}, dict {}".format( logging.debug("Client {}.{} type {}, user {}, dict {}".format(
self.value_type, self.user, dict_name)) self.major, self.minor, self.value_type, self.user, dict_name))
async def process_lookup(self, key): async def process_lookup(self, key):
""" Process a dict lookup message
"""
logging.debug("Looking up {}".format(key)) logging.debug("Looking up {}".format(key))
result = await self.dict.get(key.decode("utf8")) # Priv and shared keys are handled slighlty differently
key_type, key = key.decode("utf8").split("/", 1)
result = await self.dict.get(
key, ns=(self.user if key_type == "priv" else None)
)
# Handle various response types
if result is not None: if result is not None:
if type(result) is str: if type(result) is str:
response = result.encode("utf8") response = result.encode("utf8")
@ -75,6 +96,33 @@ class DictProtocol(asyncio.Protocol):
else: else:
return self.reply(b"N") return self.reply(b"N")
def process_begin(self, transaction_id):
""" Process a dict begin message
"""
self.transactions[transaction_id] = {}
def process_set(self, transaction_id, key, value):
""" Process a dict set message
"""
# Nothing is actually set until everything is commited
self.transactions[transaction_id][key] = value
async def process_commit(self, transaction_id):
""" Process a dict commit message
"""
# Actually handle all set operations from the transaction store
results = []
for key, value in self.transactions[transaction_id].items():
logging.debug("Storing {}={}".format(key, value))
key_type, key = key.decode("utf8").split("/", 1)
result = await self.dict.set(
key, json.loads(value),
ns=(self.user if key_type == "priv" else None)
)
# Remove stored transaction
del self.transactions[transaction_id]
return self.reply(b"O", transaction_id)
def reply(self, command, *args): def reply(self, command, *args):
logging.debug("Replying {} with {}".format(command, args)) logging.debug("Replying {} with {}".format(command, args))
self.transport.write(command) self.transport.write(command)
@ -91,5 +139,8 @@ class DictProtocol(asyncio.Protocol):
COMMANDS = { COMMANDS = {
ord("H"): process_hello, ord("H"): process_hello,
ord("L"): process_lookup ord("L"): process_lookup,
ord("B"): process_begin,
ord("C"): process_commit,
ord("S"): process_set
} }

@ -51,6 +51,8 @@ class NetstringProtocol(asyncio.Protocol):
self.string_received(string) self.string_received(string)
def string_received(self, string): def string_received(self, string):
""" A new netstring was received
"""
pass pass
def send_string(self, string): def send_string(self, string):
@ -81,16 +83,14 @@ class SocketmapProtocol(NetstringProtocol):
self.transport = transport self.transport = transport
def string_received(self, string): def string_received(self, string):
# The postfix format contains a space for separating the map name and
# the key
space = string.find(0x20) space = string.find(0x20)
if space != -1: if space != -1:
name = string[:space].decode('ascii') name = string[:space].decode('ascii')
key = string[space+1:].decode('utf8') key = string[space+1:].decode('utf8')
return asyncio.async(self.process_request(name, key)) return asyncio.async(self.process_request(name, key))
def send_string(self, string):
logging.debug("Send {}".format(string))
super(SocketmapProtocol, self).send_string(string)
async def process_request(self, name, key): async def process_request(self, name, key):
""" Process a request by querying the provided map. """ Process a request by querying the provided map.
""" """

@ -16,11 +16,30 @@ class UrlTable(object):
""" """
self.url_pattern = url_pattern.replace('§', '{}') self.url_pattern = url_pattern.replace('§', '{}')
async def get(self, key): async def get(self, key, ns=None):
logging.debug("Getting {} from url table".format(key)) """ Get the given key in the provided namespace
"""
if ns is not None:
key += "/" + ns
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(self.url_pattern.format(key)) as request: async with session.get(self.url_pattern.format(key)) as request:
if request.status == 200: if request.status == 200:
result = await request.json() result = await request.json()
logging.debug("Got {} from url table".format(result)) return result
async def set(self, key, value, ns=None):
""" Set a value for the given key in the provided namespace
"""
if ns is not None:
key += "/" + ns
async with aiohttp.ClientSession() as session:
await session.post(self.url_pattern.format(key), json=value)
async def iter(self, cat):
""" Iterate the given key (experimental)
"""
async with aiohttp.ClientSession() as session:
async with session.get(self.url_pattern.format(cat)) as request:
if request.status == 200:
result = await request.json()
return result return result

@ -9,14 +9,22 @@ def main():
""" Run a podop server based on CLI arguments """ Run a podop server based on CLI arguments
""" """
parser = argparse.ArgumentParser("Postfix and Dovecot proxy") parser = argparse.ArgumentParser("Postfix and Dovecot proxy")
parser.add_argument("--socket", help="path to the socket", required=True) parser.add_argument("--socket", required=True,
parser.add_argument("--mode", choices=SERVER_TYPES.keys(), required=True) help="path to the listening unix socket")
parser.add_argument("--name", help="name of the table", action="append") parser.add_argument("--mode", choices=SERVER_TYPES.keys(), required=True,
parser.add_argument("--type", choices=TABLE_TYPES.keys(), action="append") help="select which server will connect to Podop")
parser.add_argument("--param", help="table parameter", action="append") parser.add_argument("--name", action="append",
help="name of each configured table")
parser.add_argument("--type", choices=TABLE_TYPES.keys(), action="append",
help="type of each configured table")
parser.add_argument("--param", action="append",
help="mandatory param for each table configured")
parser.add_argument("-v", "--verbose", dest="verbosity",
action="count", default=0,
help="increases log verbosity for each occurence.")
args = parser.parse_args() args = parser.parse_args()
run_server( run_server(
args.mode, args.socket, args.verbosity, args.mode, args.socket,
zip(args.name, args.type, args.param) if args.name else [] zip(args.name, args.type, args.param) if args.name else []
) )

@ -7,7 +7,7 @@ with open("README.md", "r") as fh:
setup( setup(
name="podop", name="podop",
version="0.1.1", version="0.2",
description="Postfix and Dovecot proxy", description="Postfix and Dovecot proxy",
long_description=long_description, long_description=long_description,
long_description_content_type="text/markdown", long_description_content_type="text/markdown",

Loading…
Cancel
Save