2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-06-09 09:33:09 +00:00

Merge pull request #210 from VersatusHPC/remove-eventlet

Comment fixup and attempted removal of eventlet workarounds that shouldn't be needed
This commit is contained in:
Jarrod Johnson
2026-05-20 15:43:06 -04:00
committed by GitHub
10 changed files with 42 additions and 68 deletions
+1 -1
View File
@@ -3,7 +3,7 @@ ADD stdeb.patch /tmp/
ADD buildapt.sh /bin/
ADD distributions.tmpl /bin/
RUN ["apt-get", "update"]
RUN ["apt-get", "install", "-y", "reprepro", "python3-stdeb", "gnupg-agent", "devscripts", "debhelper", "libsoap-lite-perl", "libdbi-perl", "quilt", "git", "python3-pyparsing", "python3-dnspython", "python3-eventlet", "python3-netifaces", "python3-paramiko", "dh-python", "libjson-perl", "ronn", "alien", "gcc", "make"]
RUN ["apt-get", "install", "-y", "reprepro", "python3-stdeb", "gnupg-agent", "devscripts", "debhelper", "libsoap-lite-perl", "libdbi-perl", "quilt", "git", "python3-pyparsing", "python3-dnspython", "python3-netifaces", "python3-paramiko", "dh-python", "libjson-perl", "ronn", "alien", "gcc", "make"]
RUN ["mkdir", "-p", "/sources/git/"]
RUN ["mkdir", "-p", "/debs/"]
RUN ["mkdir", "-p", "/apt/"]
+1 -2
View File
@@ -138,8 +138,7 @@ class Command(object):
:param bmc: hostname or ip address of the BMC (default is local)
:param userid: username to use to connect (default to no user)
:param password: password to connect to the BMC (defaults to no password)
:param onlogon: function to run when logon completes in an asynchronous
fashion. This will result in a greenthread behavior.
:param onlogon: function to run when logon completes asynchronously.
:param kg: Optional parameter to use if BMC has a particular Kg configured
:param verifycallback: For OEM extensions that use HTTPS, this function
will be used to evaluate the certificate.
+1 -1
View File
@@ -421,7 +421,7 @@ class Console(object):
If a caller is a simple little utility, provide a function to
eternally run the event loop. More complicated usage would be expected
to provide their own event loop behavior, though this could be used
within the greenthread implementation of caller's choice if desired.
within the async implementation of caller's choice if desired.
"""
# wait_for_rsp promises to return a false value when no sessions are
# alive anymore
@@ -56,13 +56,8 @@ except AttributeError:
# session. This will be randomized to stagger out retries
# in case of congestion
initialtimeout = 0.5
# the thread in which all IO will be performed
# While the model as-is works fine for it's own coroutine
# structure, when combined with threading or something like
# eventlet, it becomes difficult for the calling code to cope
# This thread will tuck away the threading situation such that
# calling code doesn't have to do any gymnastics to cope with
# the nature of things.
# the thread in which all IO will be performed, so that
# calling code doesn't have to manage threading directly
iothread = None
# whether io thread is yet ready to work
iothreadready = False
@@ -186,24 +181,13 @@ async def _io_wait(timeout, myaddr=None, evq=None):
evq.append(evt)
deadline = timeout + _monotonic_time()
ioqueue.append((deadline, evt, myaddr))
# Unfortunately, at least with eventlet patched threading, the wait()
# is a somewhat busy wait if given a deadline. Workaround by having
# it piggy back on the select() in the io thread, which is a truly
# lazy wait even with eventlet involvement
if deadline < selectdeadline:
intsock = iosockets[0]
if hasattr(intsock, 'fd'):
# if in eventlet, go for the true sendto, which is less glitchy
intsock = intsock.fd
intsock.sendto(b'\x01', (myself, iosockets[0].getsockname()[1]))
iosockets[0].sendto(b'\x01', (myself, iosockets[0].getsockname()[1]))
await evt.wait()
def _io_sendto(mysocket, packet, sockaddr):
# Want sendto to act reasonably sane..
mysocket.setblocking(1)
if hasattr(mysocket, 'fd'):
mysocket = mysocket.fd
try:
mysocket.sendto(packet, sockaddr)
except Exception:
@@ -864,7 +848,7 @@ class Session(object):
# within a process. In this way, synchronous usage of the interface
# plays well with asynchronous use. In fact, this produces the
# behavior of only the constructor needing a callback. From then on,
# synchronous usage of the class acts in a greenthread style governed
# synchronous usage of the class acts in a coroutine style governed
# by order of data on the network
await self.awaitresponse(retry, netfn + 1, command)
lastresponse = self.lastresponse
@@ -486,7 +486,7 @@ class Session(object):
# within a process. In this way, synchronous usage of the interface
# plays well with asynchronous use. In fact, this produces the
# behavior of only the constructor needing a callback. From then on,
# synchronous usage of the class acts in a greenthread style governed
# synchronous usage of the class acts in a coroutine style governed
# by order of data on the network
self.awaitresponse(retry)
lastresponse = self.lastresponse
@@ -462,8 +462,6 @@ class ConsoleHandler(object):
self._attribwatcher = None
async def get_console_output(self, data):
# Spawn as a greenthread, return control as soon as possible
# to the console object
await self._handle_console_output(data)
async def attachsession(self, session):
@@ -587,7 +587,6 @@ async def _full_updatemacmap(configmanager):
if switch not in switches:
del _macsbyswitch[switch]
switchauth = get_switchcreds(configmanager, switches)
#pool = GreenPool(64)
tsks = []
for sa in switchauth:
tsks.append(_map_switch(sa))
@@ -98,13 +98,6 @@ def get_pci_text_from_ids(subdevice, subvendor, device, vendor):
return vendorstr, devstr
# There is something not right with the RLocks used in pyghmi when
# greenthreads comes into play. It seems like sometimes on acquire,
# it calls _get_ident and it isn't the id(greenlet) and so
# a thread deadlocks itself due to identity crisis?
# However, since we are not really threaded, the operations being protected
# are not actually dangerously multiplexed... so we can replace with
# a null context manager for now
class NullLock(object):
def donothing(self, *args, **kwargs):
+2 -6
View File
@@ -17,6 +17,7 @@
import asyncio
import glob
import os
import subprocess
import shutil
import tempfile
import confluent.sshutil as sshutil
@@ -197,12 +198,7 @@ async def sync_list_to_node(sl, node, suffixes, peerip=None):
targip = peerip
output, stderr = await util.check_output(
'rsync', '-rvLD', targdir + '/', 'root@[{}]:/'.format(targip))
except Exception as e:
if 'CalledProcessError' not in repr(e):
# https://github.com/eventlet/eventlet/issues/413
# for some reason, can't catch the calledprocesserror normally
# for this exception, implement a hack workaround
raise
except subprocess.CalledProcessError as e:
unreadablefiles = []
for root, dirnames, filenames in os.walk(targdir):
for filename in filenames:
+32 -27
View File
@@ -30,37 +30,27 @@
# recommend, but hopefully can be useful reference material
import asyncio
import sys
sys.path.append('/opt/confluent/lib/python')
import aiohmi.util.webclient as webclient
import confluent.client as cli
import eventlet.greenpool
import gzip
import io
import json
import os
import struct
import subprocess
import time
webclient = eventlet.import_patched('pyghmi.util.webclient')
bmcsbyuuid = {}
def checkfish(addr, mac):
wc = webclient.SecureHTTPConnection(addr, 443, verifycallback=lambda x: True)
wc.connect()
wc.request('GET', '/redfish/v1')
rsp = wc.getresponse()
body = rsp.read()
if body[:2] == b'\x1f\x8b':
body = gzip.GzipFile(fileobj=io.BytesIO(body)).read()
try:
body = json.loads(body)
except json.decoder.JSONDecodeError:
return
async def checkfish(addr, mac):
wc = webclient.WebConnection(addr, 443, verifycallback=lambda x: True)
body = await wc.grab_json_response('/redfish/v1')
if not body:
return None
uuid = body.get('UUID', None)
if not uuid:
return
return None
#This part is needed if a bmc sticks 'wire format' uuid in the json body
#Should be skipped for bmcs that present it sanely
uuidparts = uuid.split('-')
@@ -68,14 +58,31 @@ def checkfish(addr, mac):
uuidparts[1] = '{:04x}'.format(struct.unpack('!H', struct.pack('<H', int(uuidparts[1], 16)))[0])
uuidparts[2] = '{:04x}'.format(struct.unpack('!H', struct.pack('<H', int(uuidparts[2], 16)))[0])
uuid = '-'.join(uuidparts)
if uuid in bmcsbyuuid:
bmcsbyuuid[uuid]['bmcs'][mac] = addr
else:
bmcsbyuuid[uuid] = {'bmcs': {mac: addr}}
return (uuid, mac, addr)
async def probe_bmcs(mactonode, mactoips):
tasks = []
macs = []
for mac in sorted(mactonode):
tasks.append(checkfish(mactoips[mac], mac))
macs.append(mac)
results = await asyncio.gather(*tasks, return_exceptions=True)
for mac, result in zip(macs, results):
if isinstance(result, Exception):
sys.stderr.write('Failed to probe {}: {}\n'.format(
mactoips[mac], result))
continue
if result is None:
continue
uuid, mac, addr = result
if uuid in bmcsbyuuid:
bmcsbyuuid[uuid]['bmcs'][mac] = addr
else:
bmcsbyuuid[uuid] = {'bmcs': {mac: addr}}
if __name__ == '__main__':
gpool = eventlet.greenpool.GreenPool()
with open('/var/lib/dhcpd/dhcpd.leases', 'r') as leasefile:
leases = leasefile.read()
inlease = False
@@ -95,7 +102,7 @@ if __name__ == '__main__':
currip = None
inlease = False
# warm up arp tables and fdb
pings = {}
pings = {}
for mac in mactoips:
pings[mac] = subprocess.Popen(['ping', '-c', '1', mactoips[mac]], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
for mac in pings:
@@ -115,9 +122,7 @@ if __name__ == '__main__':
for inf in macinfo:
if inf.get('possiblenode', None):
mactonode[mac] = inf['possiblenode']
for mac in sorted(mactonode):
gpool.spawn(checkfish, mactoips[mac], mac)
gpool.waitall()
asyncio.run(probe_bmcs(mactonode, mactoips))
for uuid in sorted(bmcsbyuuid):
macd = bmcsbyuuid[uuid]['bmcs']
macs = sorted(macd)