mirror of
https://github.com/xcat2/confluent.git
synced 2026-05-09 02:00:11 +00:00
Implement unix socket permission controls
This commit is contained in:
@@ -17,6 +17,7 @@
|
||||
# This SCGI server provides a http wrap to confluent api
|
||||
# It additionally manages httprequest console sessions
|
||||
import base64
|
||||
import shutil
|
||||
|
||||
import aiohttp
|
||||
try:
|
||||
@@ -1166,13 +1167,15 @@ async def _assemble_json(responses, resource=None, url=None, extension=None):
|
||||
rspdata, sort_keys=True, indent=4, ensure_ascii=False).encode('utf-8'))
|
||||
|
||||
|
||||
async def serve(bind_host, bind_port):
|
||||
async def serve(bind_host, bind_port, bind_group, bind_perms):
|
||||
# TODO(jbjohnso): move to unix socket and explore
|
||||
# either making apache deal with it
|
||||
# or just supporting nginx or lighthttpd
|
||||
# for now, http port access
|
||||
# todo remains unix domain socket for even http
|
||||
sock = None
|
||||
if not bind_perms:
|
||||
bind_perms = 0o666
|
||||
while not sock:
|
||||
try:
|
||||
bind_arg = None
|
||||
@@ -1181,16 +1184,21 @@ async def serve(bind_host, bind_port):
|
||||
os.remove(bind_host)
|
||||
except OSError:
|
||||
pass
|
||||
sock = socket.socket(socket.AF_UNIX)
|
||||
os.chmod(bind_host, 0o660)
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
oldumask = os.umask(0o777 - bind_perms)
|
||||
sock.bind(bind_host)
|
||||
os.chmod(bind_host, bind_perms)
|
||||
if bind_group:
|
||||
shutil.chown(bind_host, group=bind_group)
|
||||
os.umask(oldumask)
|
||||
bind_arg = bind_host
|
||||
else:
|
||||
bindinfo = socket.getaddrinfo(
|
||||
bind_host, bind_port, 0, socket.SOCK_STREAM)
|
||||
if bindinfo[0][0] == socket.AF_INET:
|
||||
sock = socket.socket(socket.AF_INET)
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
elif bindinfo[0][0] == socket.AF_INET6:
|
||||
sock = socket.socket(socket.AF_INET6)
|
||||
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
||||
bind_arg = bindinfo[0][4]
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
@@ -1221,10 +1229,12 @@ async def serve(bind_host, bind_port):
|
||||
|
||||
|
||||
class HttpApi(object):
|
||||
def __init__(self, bind_host=None, bind_port=None):
|
||||
def __init__(self, bind_host=None, bind_port=None, bind_group=None, bind_perms=None):
|
||||
self.server = None
|
||||
self.bind_host = bind_host or '127.0.0.1'
|
||||
self.bind_port = bind_port or 4005
|
||||
self.bind_group = bind_group
|
||||
self.bind_perms = bind_perms
|
||||
|
||||
def start(self):
|
||||
global _cleaner
|
||||
@@ -1236,4 +1246,4 @@ class HttpApi(object):
|
||||
tracelog = log.Logger('trace')
|
||||
auditlog = log.Logger('audit')
|
||||
self.server = asyncio.get_event_loop().create_task(
|
||||
serve(self.bind_host, self.bind_port))
|
||||
serve(self.bind_host, self.bind_port, self.bind_group, self.bind_perms))
|
||||
|
||||
@@ -334,14 +334,14 @@ async def asyncrun(args):
|
||||
auth.check_for_yaml()
|
||||
collective.startup()
|
||||
await consoleserver.initialize()
|
||||
http_bind_host, http_bind_port = _get_connector_config('http')
|
||||
sock_bind_host, sock_bind_port = _get_connector_config('socket')
|
||||
http_bind_host, http_bind_port, http_bind_group, http_bind_perms = _get_connector_config('http')
|
||||
sock_bind_host, sock_bind_port, sock_bind_group, sock_bind_perms = _get_connector_config('socket')
|
||||
try:
|
||||
sockservice = sockapi.SockApi(sock_bind_host, sock_bind_port)
|
||||
sockservice = sockapi.SockApi(sock_bind_host, sock_bind_port, sock_bind_group, sock_bind_perms)
|
||||
asyncio.get_event_loop().create_task(sockservice.start())
|
||||
except NameError:
|
||||
pass
|
||||
webservice = httpapi.HttpApi(http_bind_host, http_bind_port)
|
||||
webservice = httpapi.HttpApi(http_bind_host, http_bind_port, http_bind_group, http_bind_perms)
|
||||
webservice.start()
|
||||
while len(list(configmanager.list_collective())) >= 2:
|
||||
# If in a collective, stall automatic startup activity
|
||||
@@ -373,7 +373,23 @@ async def asyncrun(args):
|
||||
def _get_connector_config(session):
|
||||
host = conf.get_option(session, 'bindhost')
|
||||
port = conf.get_int_option(session, 'bindport')
|
||||
return (host, port)
|
||||
group = conf.get_option(session, 'bindgroup')
|
||||
perms = conf.get_option(session, 'bindperms')
|
||||
if perms:
|
||||
if perms.startswith('0'):
|
||||
perms = int(perms, 8)
|
||||
else:
|
||||
# Parse rw-rw-rw- format (user, group, other)
|
||||
perms_value = 0
|
||||
perm_map = {'r': 4, 'w': 2, 'x': 1}
|
||||
for i, section in enumerate([perms[0:3], perms[3:6], perms[6:9]]):
|
||||
for char in section:
|
||||
if char in perm_map:
|
||||
perms_value += perm_map[char] * (8 ** (2 - i))
|
||||
perms = perms_value
|
||||
else:
|
||||
perms = None
|
||||
return (host, port, group, perms)
|
||||
|
||||
def _get_logdirectory():
|
||||
return conf.get_option('globals', 'logdirectory')
|
||||
@@ -27,6 +27,7 @@ import ctypes.util
|
||||
import errno
|
||||
import os
|
||||
import pwd
|
||||
import shutil
|
||||
import stat
|
||||
import struct
|
||||
import sys
|
||||
@@ -444,8 +445,10 @@ def removesocket():
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
async def _unixdomainhandler():
|
||||
async def _unixdomainhandler(bind_group=None, bind_perms=None):
|
||||
aloop = asyncio.get_event_loop()
|
||||
if not bind_perms:
|
||||
bind_perms = 0o666
|
||||
unixsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
unixsocket.settimeout(0)
|
||||
try:
|
||||
@@ -454,10 +457,12 @@ async def _unixdomainhandler():
|
||||
pass
|
||||
if not os.path.isdir("/var/run/confluent"):
|
||||
os.makedirs('/var/run/confluent', 0o755)
|
||||
oldumask = os.umask(0o777 - bind_perms)
|
||||
unixsocket.bind("/var/run/confluent/api.sock")
|
||||
os.chmod("/var/run/confluent/api.sock",
|
||||
stat.S_IWOTH | stat.S_IROTH | stat.S_IWGRP |
|
||||
stat.S_IRGRP | stat.S_IWUSR | stat.S_IRUSR)
|
||||
os.chmod("/var/run/confluent/api.sock", bind_perms)
|
||||
if bind_group:
|
||||
shutil.chown("/var/run/confluent/api.sock", group=bind_group)
|
||||
os.umask(oldumask)
|
||||
atexit.register(removesocket)
|
||||
unixsocket.listen(5)
|
||||
while True:
|
||||
@@ -487,11 +492,13 @@ async def _unixdomainhandler():
|
||||
|
||||
|
||||
class SockApi(object):
|
||||
def __init__(self, bindhost=None, bindport=None):
|
||||
def __init__(self, bindhost=None, bindport=None, bindgroup=None, bindperms=None):
|
||||
self.tlsserver = None
|
||||
self.unixdomainserver = None
|
||||
self.bind_host = bindhost or '::'
|
||||
self.bind_port = bindport or 13001
|
||||
self.bind_group = bindgroup
|
||||
self.bind_perms = bindperms
|
||||
|
||||
async def start(self):
|
||||
global auditlog
|
||||
@@ -504,7 +511,7 @@ class SockApi(object):
|
||||
else:
|
||||
cloop = asyncio.get_event_loop()
|
||||
tasks.spawn(self.watch_for_cert())
|
||||
self.unixdomainserver = tasks.spawn_task(_unixdomainhandler())
|
||||
self.unixdomainserver = tasks.spawn_task(_unixdomainhandler(self.bind_group, self.bind_perms))
|
||||
|
||||
async def watch_for_cert(self):
|
||||
watcher = libc.inotify_init1(os.O_NONBLOCK)
|
||||
|
||||
Reference in New Issue
Block a user