2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-06-08 08:38:33 +00:00

Remove eventlet dependency, migrate to asyncio/concurrent.futures

Replace eventlet.greenpool with concurrent.futures.ThreadPoolExecutor
in the BMC discovery script, using as_completed() for proper exception
propagation and main-thread result aggregation to avoid race conditions.

Remove dead eventlet socket compatibility code (.fd attribute checks)
from the IPMI session layer, and clean up stale eventlet references
in comments across the codebase.

Closes: xcat2/confluent#197
This commit is contained in:
Vinícius Ferrão
2026-05-02 22:56:39 -03:00
parent b7f6c158ea
commit a69d828e69
4 changed files with 29 additions and 36 deletions
@@ -57,12 +57,8 @@ except AttributeError:
# in case of congestion
initialtimeout = 0.5
# the thread in which all IO will be performed
# While the model as-is works fine for it's own coroutine
# structure, when combined with threading or something like
# eventlet, it becomes difficult for the calling code to cope
# This thread will tuck away the threading situation such that
# calling code doesn't have to do any gymnastics to cope with
# the nature of things.
# This thread tucks away the threading situation such that
# calling code doesn't have to do any gymnastics.
iothread = None
# whether io thread is yet ready to work
iothreadready = False
@@ -186,24 +182,13 @@ async def _io_wait(timeout, myaddr=None, evq=None):
evq.append(evt)
deadline = timeout + _monotonic_time()
ioqueue.append((deadline, evt, myaddr))
# Unfortunately, at least with eventlet patched threading, the wait()
# is a somewhat busy wait if given a deadline. Workaround by having
# it piggy back on the select() in the io thread, which is a truly
# lazy wait even with eventlet involvement
if deadline < selectdeadline:
intsock = iosockets[0]
if hasattr(intsock, 'fd'):
# if in eventlet, go for the true sendto, which is less glitchy
intsock = intsock.fd
intsock.sendto(b'\x01', (myself, iosockets[0].getsockname()[1]))
iosockets[0].sendto(b'\x01', (myself, iosockets[0].getsockname()[1]))
await evt.wait()
def _io_sendto(mysocket, packet, sockaddr):
# Want sendto to act reasonably sane..
mysocket.setblocking(1)
if hasattr(mysocket, 'fd'):
mysocket = mysocket.fd
try:
mysocket.sendto(packet, sockaddr)
except Exception:
-2
View File
@@ -5,8 +5,6 @@ import socket
import sys
import confluent.tasks as tasks
#this will ultimately fill the role of the 'backdoor' of eventlet
# since we have to asyncio up the input and output, we use InteractiveInterpreter and handle the
# input ourselves, since code is not asyncio friendly in and of itself
#code.InteractiveConsole().interact()
+2 -3
View File
@@ -199,9 +199,8 @@ async def sync_list_to_node(sl, node, suffixes, peerip=None):
'rsync', '-rvLD', targdir + '/', 'root@[{}]:/'.format(targip))
except Exception as e:
if 'CalledProcessError' not in repr(e):
# https://github.com/eventlet/eventlet/issues/413
# for some reason, can't catch the calledprocesserror normally
# for this exception, implement a hack workaround
# CalledProcessError can't be caught normally in some contexts,
# so check via repr as a workaround
raise
unreadablefiles = []
for root, dirnames, filenames in os.walk(targdir):
+24 -13
View File
@@ -32,18 +32,17 @@
import sys
sys.path.append('/opt/confluent/lib/python')
import concurrent.futures
import confluent.client as cli
import eventlet.greenpool
import gzip
import io
import json
import os
import struct
import subprocess
import pyghmi.util.webclient as webclient
import time
webclient = eventlet.import_patched('pyghmi.util.webclient')
bmcsbyuuid = {}
def checkfish(addr, mac):
@@ -57,10 +56,10 @@ def checkfish(addr, mac):
try:
body = json.loads(body)
except json.decoder.JSONDecodeError:
return
return None
uuid = body.get('UUID', None)
if not uuid:
return
return None
#This part is needed if a bmc sticks 'wire format' uuid in the json body
#Should be skipped for bmcs that present it sanely
uuidparts = uuid.split('-')
@@ -68,14 +67,10 @@ def checkfish(addr, mac):
uuidparts[1] = '{:04x}'.format(struct.unpack('!H', struct.pack('<H', int(uuidparts[1], 16)))[0])
uuidparts[2] = '{:04x}'.format(struct.unpack('!H', struct.pack('<H', int(uuidparts[2], 16)))[0])
uuid = '-'.join(uuidparts)
if uuid in bmcsbyuuid:
bmcsbyuuid[uuid]['bmcs'][mac] = addr
else:
bmcsbyuuid[uuid] = {'bmcs': {mac: addr}}
return (uuid, mac, addr)
if __name__ == '__main__':
gpool = eventlet.greenpool.GreenPool()
with open('/var/lib/dhcpd/dhcpd.leases', 'r') as leasefile:
leases = leasefile.read()
inlease = False
@@ -115,9 +110,25 @@ if __name__ == '__main__':
for inf in macinfo:
if inf.get('possiblenode', None):
mactonode[mac] = inf['possiblenode']
for mac in sorted(mactonode):
gpool.spawn(checkfish, mactoips[mac], mac)
gpool.waitall()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {}
for mac in sorted(mactonode):
futures[executor.submit(checkfish, mactoips[mac], mac)] = mac
for future in concurrent.futures.as_completed(futures):
mac = futures[future]
try:
result = future.result()
except Exception as e:
sys.stderr.write('Failed to probe {}: {}\n'.format(
mactoips[mac], e))
continue
if result is None:
continue
uuid, mac, addr = result
if uuid in bmcsbyuuid:
bmcsbyuuid[uuid]['bmcs'][mac] = addr
else:
bmcsbyuuid[uuid] = {'bmcs': {mac: addr}}
for uuid in sorted(bmcsbyuuid):
macd = bmcsbyuuid[uuid]['bmcs']
macs = sorted(macd)