2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-06-10 01:53:09 +00:00

Remove use of deprecated get_event_loop

This commit is contained in:
Jarrod Johnson
2026-06-04 13:36:11 -04:00
parent 86abdc4257
commit ac9fd075b9
29 changed files with 57 additions and 57 deletions
+1 -1
View File
@@ -440,7 +440,7 @@ class Command(object):
libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0)
sreader = asyncio.StreamReader()
sreaderprot = asyncio.StreamReaderProtocol(sreader)
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
tport, _ = await cloop.create_connection(
lambda: sreaderprot, sock=self.connection, ssl=ctx, server_hostname='x')
swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop)
+5 -5
View File
@@ -118,7 +118,7 @@ def _sendmsg(loop, fut, sock, msg, fds, rfd):
def send_fds(sock, msg, fds):
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
fut = cloop.create_future()
_sendmsg(cloop, fut, sock, msg, fds, None)
return fut
@@ -148,7 +148,7 @@ def _recvmsg(loop, fut, sock, msglen, maxfds, rfd):
def recv_fds(sock, msglen, maxfds):
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
fut = cloop.create_future()
_recvmsg(cloop, fut, sock, msglen, maxfds, None)
return fut
@@ -195,7 +195,7 @@ async def sendall(handle, data):
handle[1].write(data)
return await handle[1].drain()
else:
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
return await cloop.sock_sendall(handle, data)
def get_socket(handle):
@@ -212,7 +212,7 @@ async def close(handle):
handle.close()
async def send(handle, data, filehandle=None):
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
if isinstance(data, unicode):
try:
data = data.encode('utf-8')
@@ -256,7 +256,7 @@ async def _grabhdl(handle, size):
if isinstance(handle, tuple):
return await handle[0].read(size)
else:
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
return await cloop.sock_recv(handle, size)
@@ -117,7 +117,7 @@ async def watchsockets():
currfutures = []
for currsock in iosockets:
currfut = asyncio.Future()
asyncio.get_event_loop().add_reader(currsock, sock_completion, asyncio.get_event_loop(), currsock, currfut)
asyncio.get_running_loop().add_reader(currsock, sock_completion, asyncio.get_running_loop(), currsock, currfut)
currfutures.append(currfut)
#select.select(iosockets, (), (), timeout)
done, futures = await asyncio.wait(currfutures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
@@ -397,7 +397,7 @@ class Session(object):
if iothread is None:
initevt = asyncio.Event()
iothreadwaiters.append(initevt)
iothread = asyncio.get_event_loop().create_task(watchsockets())
iothread = asyncio.get_running_loop().create_task(watchsockets())
await initevt.wait()
elif not iothreadready:
initevt = asyncio.Event()
+2 -2
View File
@@ -86,7 +86,7 @@ if args[0] in ('restore', 'merge'):
skipped=skipped,
format=format)
asyncio.get_event_loop().run_until_complete(dp)
asyncio.run(dp)
if skipped['nodes']:
skippedn = ','.join(skipped['nodes'])
print('The following nodes were skipped during merge: '
@@ -129,6 +129,6 @@ elif args[0] == 'dump':
format = 'yaml' if options.yaml else 'json'
dp = cfm.dump_db_to_directory(dumpdir, password, options.redact,
options.skipkeys, format=format)
asyncio.get_event_loop().run_until_complete(dp)
asyncio.run(dp)
+1 -1
View File
@@ -108,7 +108,7 @@ async def run_handler(hdlr, req):
except KeyError:
raise exc.InvalidArgumentException(
'Invalid Session ID or missing request id')
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
cloop.create_task(asyncsession.run_handler(hdlr, requestid))
return requestid
+2 -2
View File
@@ -327,7 +327,7 @@ async def check_user_passphrase(name, passphrase, operation=None, element=None,
#pam won't work if the user doesn't exist, don't go further
await asyncio.sleep(0.05) # stall even on test for existence of a username
return None
usergood = await asyncio.get_event_loop().run_in_executor(authworkers, pam_check, pwe, user, passphrase)
usergood = await asyncio.get_running_loop().run_in_executor(authworkers, pam_check, pwe, user, passphrase)
if usergood:
if bpassphrase:
_passcache[(user, tenant)] = hashlib.sha256(bpassphrase).digest()
@@ -393,5 +393,5 @@ async def _do_pbkdf(passphrase, salt):
# compute. However, we do want to wait for result, so we have
# one of the exceedingly rare sort of circumstances where 'apply'
# actually makes sense
res = await asyncio.get_event_loop().run_in_executor(authworkers, _apply_pbkdf, passphrase, salt)
res = await asyncio.get_running_loop().run_in_executor(authworkers, _apply_pbkdf, passphrase, salt)
return res
@@ -233,7 +233,7 @@ async def follow_leader(remote, leader):
random.random(), start_collective)
async def _create_tls_connection(host, port):
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
ainfo = await cloop.getaddrinfo(
host, port, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM)
for res in ainfo:
@@ -408,7 +408,7 @@ async def handle_connection(connection, cert, request, local=False):
return
host = request['server']
try:
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
ainfo = await cloop.getaddrinfo(
host, 13001, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM)
for res in ainfo:
@@ -389,7 +389,7 @@ async def exec_on_leader(function, *args):
xid = confluent.util.stringify(base64.b64encode(os.urandom(8)))
while xid in _pendingchangesets:
xid = confluent.util.stringify(base64.b64encode(os.urandom(8)))
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
_pendingchangesets[xid] = cloop.create_future() # future instead of event
rpcpayload = msgpack.packb({'function': function, 'args': args,
'xid': xid}, use_bin_type=False)
+2 -2
View File
@@ -65,7 +65,7 @@ class CredServer(object):
apiarmed = None
hmackey = None
hmacval = None
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
await cloop.sock_sendall(client, b'\xc2\xd1-\xa8\x80\xd8j\xba')
tlv = bytearray(await cloop.sock_recv(client, 2))
if tlv[0] != 1:
@@ -150,4 +150,4 @@ async def main():
while True:
await asyncio.sleep(86400)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())
+1 -1
View File
@@ -55,7 +55,7 @@ async def interact(cloop, cnn):
async def srv_debug(sock):
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
while True:
cnn, addr = await cloop.sock_accept(sock)
tasks.spawn(interact(cloop, cnn))
+1 -1
View File
@@ -1764,4 +1764,4 @@ async def main():
await asyncio.sleep(30)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())
@@ -28,7 +28,7 @@ async def remote_nodecfg(nodename, cfm):
ipaddr = cfg.get(nodename, {}).get('hardwaremanagement.manager', {}).get(
'value', None)
ipaddr = ipaddr.split('/', 1)[0]
ipaddr = (await asyncio.get_event_loop().getaddrinfo(ipaddr, 0))[0][-1]
ipaddr = (await asyncio.get_running_loop().getaddrinfo(ipaddr, 0))[0][-1]
if not ipaddr:
raise Exception('Cannot remote configure a system without known '
'address')
@@ -98,7 +98,7 @@ class NodeHandler(bmchandler.NodeHandler):
if smmip:
smmip = smmip.split('/', 1)[0]
if smmip and ':' not in smmip:
smmip = await asyncio.get_event_loop().getaddrinfo(smmip, 0)[0]
smmip = await asyncio.get_running_loop().getaddrinfo(smmip, 0)[0]
smmip = smmip[-1][0]
if smmip and ':' in smmip:
raise exc.NotImplementedException('IPv6 not supported')
@@ -46,7 +46,7 @@ async def remote_nodecfg(nodename, cfm):
ipaddr = cfg.get(nodename, {}).get('hardwaremanagement.manager', {}).get(
'value', None)
ipaddr = ipaddr.split('/', 1)[0]
ipaddr = await asyncio.get_event_loop().getaddrinfo(ipaddr, 0)[0][-1]
ipaddr = await asyncio.get_running_loop().getaddrinfo(ipaddr, 0)[0][-1]
if not ipaddr:
raise Exception('Cannot remote configure a system without known '
'address')
+2 -2
View File
@@ -55,7 +55,7 @@ async def handle_connection(incoming, outgoing):
async def forward_port(sock, target, clientip, sessionid):
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
sock.setblocking(False)
while True:
conn, cli = await loop.sock_accept(sock)
@@ -79,7 +79,7 @@ async def forward_video():
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('::', 3900, 0, 0))
sock.listen(50)
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
sock.setblocking(False)
while True:
conn, cli = await loop.sock_accept(sock)
+2 -2
View File
@@ -1241,9 +1241,9 @@ class HttpApi(object):
global auditlog
global tracelog
if _cleaner is None:
_cleaner = asyncio.get_event_loop().create_task(
_cleaner = asyncio.get_running_loop().create_task(
_sessioncleaner())
tracelog = log.Logger('trace')
auditlog = log.Logger('audit')
self.server = asyncio.get_event_loop().create_task(
self.server = asyncio.get_running_loop().create_task(
serve(self.bind_host, self.bind_port, self.bind_group, self.bind_perms))
+2 -2
View File
@@ -46,7 +46,7 @@ async def node_by_manager(manager):
"""
manageraddresses = []
for tmpaddr in await asyncio.get_event_loop().getaddrinfo(manager, None):
for tmpaddr in await asyncio.get_running_loop().getaddrinfo(manager, None):
manageraddresses.append(tmpaddr[4][0])
cfm = configmanager.ConfigManager(None)
if manager in manager_to_nodemap:
@@ -68,7 +68,7 @@ async def node_by_manager(manager):
if currhm in manageraddresses:
manager_to_nodemap[manager] = node
return node
for curraddr in await asyncio.get_event_loop().getaddrinfo(currhm, None):
for curraddr in await asyncio.get_running_loop().getaddrinfo(currhm, None):
curraddr = curraddr[4][0]
if curraddr in manageraddresses:
manager_to_nodemap[manager] = node
+2 -2
View File
@@ -320,7 +320,7 @@ async def asyncrun(args):
_redirectoutput()
if havefcntl:
_updatepidfile()
asyncio.get_event_loop().set_debug(True)
asyncio.get_running_loop().set_debug(True)
signal.signal(signal.SIGINT, terminate)
signal.signal(signal.SIGTERM, terminate)
atexit.register(doexit)
@@ -338,7 +338,7 @@ async def asyncrun(args):
sock_bind_host, sock_bind_port, sock_bind_group, sock_bind_perms = _get_connector_config('socket')
try:
sockservice = sockapi.SockApi(sock_bind_host, sock_bind_port, sock_bind_group, sock_bind_perms)
asyncio.get_event_loop().create_task(sockservice.start())
asyncio.get_running_loop().create_task(sockservice.start())
except NameError:
pass
webservice = httpapi.HttpApi(http_bind_host, http_bind_port, http_bind_group, http_bind_perms)
+1 -1
View File
@@ -51,7 +51,7 @@ async def handle_request(configmanager, inputdata, pathcomponents, operation):
async def requestmount(subdir, filename):
await assure_browserfs()
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
a = socket.socket(socket.AF_UNIX)
a.settimeout(0)
await cloop.sock_connect(a, '/var/run/confluent/browserfs/control')
+1 -1
View File
@@ -47,7 +47,7 @@ async def _update_neigh():
nlhdr = b'\x1c\x00\x00\x00\x1e\x00\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00'
# ndmsg struct u8 family u8 pad, u16 pad, s32 ifidx, u16 state, u8 flags, u8 type
ndmsg= b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
await cloop.sock_sendall(s, nlhdr + ndmsg)
#s.sendall(nlhdr + ndmsg)
neightable = {}
+2 -2
View File
@@ -777,10 +777,10 @@ async def get_my_addresses(idx=0, family=0, matchlla=None):
s = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE)
s.bind((0, 0))
s.setblocking(False)
await asyncio.get_event_loop().sock_sendall(s, nlhdr + ifaddrmsg)
await asyncio.get_running_loop().sock_sendall(s, nlhdr + ifaddrmsg)
addrs = []
while True:
pdata = await asyncio.get_event_loop().sock_recv(s, 65536)
pdata = await asyncio.get_running_loop().sock_recv(s, 65536)
v = memoryview(pdata)
if struct.unpack('H', v[4:6])[0] == 3: # netlink done message
break
@@ -226,7 +226,7 @@ async def _offload_map_switch(switch, password, user, privprotocol=None):
evtid = random.randint(0, 4294967295)
while evtid in _offloadevts:
evtid = random.randint(0, 4294967295)
_offloadevts[evtid] = asyncio.get_event_loop().create_future()
_offloadevts[evtid] = asyncio.get_running_loop().create_future()
_offloader.stdin.write(msgpack.packb((evtid, switch, password, user, privprotocol),
use_bin_type=True))
#_offloader.stdin.flush()
@@ -251,7 +251,7 @@ async def _start_offloader():
#fl = fcntl.fcntl(_offloader.stdout.fileno(), fcntl.F_GETFL)
#fcntl.fcntl(_offloader.stdout.fileno(),
# fcntl.F_SETFL, fl | os.O_NONBLOCK)
asyncio.get_event_loop().create_task(_recv_offload())
asyncio.get_running_loop().create_task(_recv_offload())
async def _recv_offload():
@@ -740,7 +740,7 @@ async def rescan(cfg):
pass
async def get_stdin_reader():
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await cloop.connect_read_pipe(lambda: protocol, sys.stdin)
@@ -80,7 +80,7 @@ class WebConnection(wc.WebConnection):
async def connect(self):
if self.secure:
return super(WebConnection, self).connect()
addrinfo = (await asyncio.get_event_loop().getaddrinfo(self.host, self.port))[0]
addrinfo = (await asyncio.get_running_loop().getaddrinfo(self.host, self.port))[0]
# workaround problems of too large mtu, moderately frequent occurance
# in this space
plainsock = socket.socket(addrinfo[0])
+3 -3
View File
@@ -37,7 +37,7 @@ running_status = {}
async def recv_exact(conn, size):
data = b''
while len(data) < size:
chunk = await asyncio.get_event_loop().sock_recv(conn, size - len(data))
chunk = await asyncio.get_running_loop().sock_recv(conn, size - len(data))
if not chunk:
return None
data += chunk
@@ -138,7 +138,7 @@ class PlayRunner(object):
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=localenv)
conn, _ = await asyncio.get_event_loop().sock_accept(feedback)
conn, _ = await asyncio.get_running_loop().sock_accept(feedback)
conn.setblocking(False)
try:
result = True
@@ -153,7 +153,7 @@ class PlayRunner(object):
if result is not None:
self.results.append(result)
finally:
await asyncio.get_event_loopy().sock_close(conn)
await asyncio.get_running_loop().sock_close(conn)
stdout, stder = await worker.communicate()
self.stderr += stder.decode('utf8')
self.stdout += stdout.decode('utf8')
+1 -1
View File
@@ -236,7 +236,7 @@ async def handle_request(req, make_response, mimetype):
if not bmcaddr:
return await make_response(mimetype, 500, 'Internal Server Error', body='Missing value in hardwaremanagement.manager')
bmcaddr = bmcaddr.split('/', 1)[0]
bmcaddr = await asyncio.get_event_loop().getaddrinfo(bmcaddr, 0)[0]
bmcaddr = await asyncio.get_running_loop().getaddrinfo(bmcaddr, 0)[0]
bmcaddr = bmcaddr[-1][0]
if '.' in bmcaddr: # ipv4 is allowed
netconfig = await netutil.get_nic_config(cfg, nodename, ip=bmcaddr)
+1 -1
View File
@@ -29,7 +29,7 @@ import pysnmp.smi.rfc1902 as rfc1902
async def _get_transport(name):
# Annoyingly, pysnmp does not automatically determine ipv6 v ipv4
res = await asyncio.get_event_loop().getaddrinfo(name, 161, type=socket.SOCK_DGRAM)
res = await asyncio.get_running_loop().getaddrinfo(name, 161, type=socket.SOCK_DGRAM)
if res[0][0] == socket.AF_INET6:
return await snmp.Udp6TransportTarget.create(res[0][4], 2)
else:
+6 -6
View File
@@ -376,7 +376,7 @@ async def _tlshandler(bind_host, bind_port):
plainsocket.setsockopt(socket.SOL_TCP, 23, 5)
plainsocket.listen(5)
cs = credserver.CredServer()
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
while (1): # TODO: exithook
cnn, addr = await cloop.sock_accept(plainsocket)
if addr[1] < 1000:
@@ -407,7 +407,7 @@ class PySSLContext(ctypes.Structure):
async def _tlsstartup(cnn):
authname = None
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
cert = None
conf.init_config()
configfile = conf.get_config()
@@ -446,7 +446,7 @@ def removesocket():
pass
async def _unixdomainhandler(bind_group=None, bind_perms=None):
aloop = asyncio.get_event_loop()
aloop = asyncio.get_running_loop()
if not bind_perms:
bind_perms = 0o666
unixsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -509,7 +509,7 @@ class SockApi(object):
if self.should_run_remoteapi():
self.start_remoteapi()
else:
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
tasks.spawn(self.watch_for_cert())
self.unixdomainserver = tasks.spawn_task(_unixdomainhandler(self.bind_group, self.bind_perms))
@@ -518,10 +518,10 @@ class SockApi(object):
if libc.inotify_add_watch(watcher, b'/etc/confluent/', 0x100) > -1:
while True:
currfut = asyncio.Future()
asyncio.get_event_loop().add_reader(
asyncio.get_running_loop().add_reader(
watcher, currfut.set_result, None)
currfut.add_done_callback(
lambda x: asyncio.get_event_loop().remove_reader(watcher))
lambda x: asyncio.get_running_loop().remove_reader(watcher))
done, _ = await asyncio.wait([currfut], return_when=asyncio.FIRST_COMPLETED)
for currfut in done:
await currfut
+1 -1
View File
@@ -156,7 +156,7 @@ def spawn_task(coro):
try:
return asyncio.create_task(coro)
except AttributeError:
return asyncio.get_event_loop().create_task(coro)
return asyncio.get_running_loop().create_task(coro)
def spawn(coro):
+6 -6
View File
@@ -94,7 +94,7 @@ async def close_session(sessionid):
async def send_grant(conn, nodename, rqtype):
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
parmcallback = _nodeparms.get(nodename, None)
cookies = {}
protos = []
@@ -121,7 +121,7 @@ async def send_grant(conn, nodename, rqtype):
else:
# original openbmc dialect
portnum = 443
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
cfg = configmanager.ConfigManager(None)
c = cfg.get_node_attributes(
nodename,
@@ -204,7 +204,7 @@ async def send_grant(conn, nodename, rqtype):
async def recv_exact(conn, n):
#TODO:asyncmerge: review recv_exact usage
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
retdata = b''
while len(retdata) < n:
currdata = await cloop.sock_recv(conn, n - len(retdata))
@@ -216,7 +216,7 @@ async def recv_exact(conn, n):
async def evaluate_request(conn):
allow = False
authname = None
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
try:
creds = conn.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED,
struct.calcsize('iII'))
@@ -275,7 +275,7 @@ async def evaluate_request(conn):
conn.close()
async def monitor_requests():
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
a = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
os.remove('/var/run/confluent/vinz/approval')
@@ -291,7 +291,7 @@ async def monitor_requests():
async def request_session(nodename):
await assure_vinz()
cloop = asyncio.get_event_loop()
cloop = asyncio.get_running_loop()
a = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
a.setblocking(0)
await cloop.sock_connect(a, '/var/run/confluent/vinz/control')