From 2586393a45b7027e7f093e8a154ab4f2cff9ee92 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 7 May 2026 10:07:08 -0400 Subject: [PATCH] Implement unix socket permission controls --- confluent_server/confluent/httpapi.py | 24 +++++++++++++++++------- confluent_server/confluent/main.py | 26 +++++++++++++++++++++----- confluent_server/confluent/sockapi.py | 19 +++++++++++++------ 3 files changed, 51 insertions(+), 18 deletions(-) diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 6c7d4bf7..9462da8b 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -17,6 +17,7 @@ # This SCGI server provides a http wrap to confluent api # It additionally manages httprequest console sessions import base64 +import shutil import aiohttp try: @@ -1166,13 +1167,15 @@ async def _assemble_json(responses, resource=None, url=None, extension=None): rspdata, sort_keys=True, indent=4, ensure_ascii=False).encode('utf-8')) -async def serve(bind_host, bind_port): +async def serve(bind_host, bind_port, bind_group, bind_perms): # TODO(jbjohnso): move to unix socket and explore # either making apache deal with it # or just supporting nginx or lighthttpd # for now, http port access # todo remains unix domain socket for even http sock = None + if not bind_perms: + bind_perms = 0o666 while not sock: try: bind_arg = None @@ -1181,16 +1184,21 @@ async def serve(bind_host, bind_port): os.remove(bind_host) except OSError: pass - sock = socket.socket(socket.AF_UNIX) - os.chmod(bind_host, 0o660) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + oldumask = os.umask(0o777 - bind_perms) + sock.bind(bind_host) + os.chmod(bind_host, bind_perms) + if bind_group: + shutil.chown(bind_host, group=bind_group) + os.umask(oldumask) bind_arg = bind_host else: bindinfo = socket.getaddrinfo( bind_host, bind_port, 0, socket.SOCK_STREAM) if bindinfo[0][0] == socket.AF_INET: - sock = socket.socket(socket.AF_INET) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) elif bindinfo[0][0] == socket.AF_INET6: - sock = socket.socket(socket.AF_INET6) + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) bind_arg = bindinfo[0][4] sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) @@ -1221,10 +1229,12 @@ async def serve(bind_host, bind_port): class HttpApi(object): - def __init__(self, bind_host=None, bind_port=None): + def __init__(self, bind_host=None, bind_port=None, bind_group=None, bind_perms=None): self.server = None self.bind_host = bind_host or '127.0.0.1' self.bind_port = bind_port or 4005 + self.bind_group = bind_group + self.bind_perms = bind_perms def start(self): global _cleaner @@ -1236,4 +1246,4 @@ class HttpApi(object): tracelog = log.Logger('trace') auditlog = log.Logger('audit') self.server = asyncio.get_event_loop().create_task( - serve(self.bind_host, self.bind_port)) + serve(self.bind_host, self.bind_port, self.bind_group, self.bind_perms)) diff --git a/confluent_server/confluent/main.py b/confluent_server/confluent/main.py index 9ba7a24a..69753b98 100644 --- a/confluent_server/confluent/main.py +++ b/confluent_server/confluent/main.py @@ -334,14 +334,14 @@ async def asyncrun(args): auth.check_for_yaml() collective.startup() await consoleserver.initialize() - http_bind_host, http_bind_port = _get_connector_config('http') - sock_bind_host, sock_bind_port = _get_connector_config('socket') + http_bind_host, http_bind_port, http_bind_group, http_bind_perms = _get_connector_config('http') + sock_bind_host, sock_bind_port, sock_bind_group, sock_bind_perms = _get_connector_config('socket') try: - sockservice = sockapi.SockApi(sock_bind_host, sock_bind_port) + sockservice = sockapi.SockApi(sock_bind_host, sock_bind_port, sock_bind_group, sock_bind_perms) asyncio.get_event_loop().create_task(sockservice.start()) except NameError: pass - webservice = httpapi.HttpApi(http_bind_host, http_bind_port) + webservice = httpapi.HttpApi(http_bind_host, http_bind_port, http_bind_group, http_bind_perms) webservice.start() while len(list(configmanager.list_collective())) >= 2: # If in a collective, stall automatic startup activity @@ -373,7 +373,23 @@ async def asyncrun(args): def _get_connector_config(session): host = conf.get_option(session, 'bindhost') port = conf.get_int_option(session, 'bindport') - return (host, port) + group = conf.get_option(session, 'bindgroup') + perms = conf.get_option(session, 'bindperms') + if perms: + if perms.startswith('0'): + perms = int(perms, 8) + else: + # Parse rw-rw-rw- format (user, group, other) + perms_value = 0 + perm_map = {'r': 4, 'w': 2, 'x': 1} + for i, section in enumerate([perms[0:3], perms[3:6], perms[6:9]]): + for char in section: + if char in perm_map: + perms_value += perm_map[char] * (8 ** (2 - i)) + perms = perms_value + else: + perms = None + return (host, port, group, perms) def _get_logdirectory(): return conf.get_option('globals', 'logdirectory') \ No newline at end of file diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 8992b4e9..ec8c1e16 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -27,6 +27,7 @@ import ctypes.util import errno import os import pwd +import shutil import stat import struct import sys @@ -444,8 +445,10 @@ def removesocket(): except OSError: pass -async def _unixdomainhandler(): +async def _unixdomainhandler(bind_group=None, bind_perms=None): aloop = asyncio.get_event_loop() + if not bind_perms: + bind_perms = 0o666 unixsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) unixsocket.settimeout(0) try: @@ -454,10 +457,12 @@ async def _unixdomainhandler(): pass if not os.path.isdir("/var/run/confluent"): os.makedirs('/var/run/confluent', 0o755) + oldumask = os.umask(0o777 - bind_perms) unixsocket.bind("/var/run/confluent/api.sock") - os.chmod("/var/run/confluent/api.sock", - stat.S_IWOTH | stat.S_IROTH | stat.S_IWGRP | - stat.S_IRGRP | stat.S_IWUSR | stat.S_IRUSR) + os.chmod("/var/run/confluent/api.sock", bind_perms) + if bind_group: + shutil.chown("/var/run/confluent/api.sock", group=bind_group) + os.umask(oldumask) atexit.register(removesocket) unixsocket.listen(5) while True: @@ -487,11 +492,13 @@ async def _unixdomainhandler(): class SockApi(object): - def __init__(self, bindhost=None, bindport=None): + def __init__(self, bindhost=None, bindport=None, bindgroup=None, bindperms=None): self.tlsserver = None self.unixdomainserver = None self.bind_host = bindhost or '::' self.bind_port = bindport or 13001 + self.bind_group = bindgroup + self.bind_perms = bindperms async def start(self): global auditlog @@ -504,7 +511,7 @@ class SockApi(object): else: cloop = asyncio.get_event_loop() tasks.spawn(self.watch_for_cert()) - self.unixdomainserver = tasks.spawn_task(_unixdomainhandler()) + self.unixdomainserver = tasks.spawn_task(_unixdomainhandler(self.bind_group, self.bind_perms)) async def watch_for_cert(self): watcher = libc.inotify_init1(os.O_NONBLOCK)