2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-07-03 14:32:14 +00:00

Convert affluent method from eventlet

This commit is contained in:
Jarrod Johnson
2024-08-16 15:17:06 -04:00
parent 5eaf998391
commit 708170b06a
@@ -14,11 +14,10 @@
# limitations under the License.
import eventlet
import eventlet.queue as queue
import eventlet.green.socket as socket
import asyncio
import socket
import confluent.exceptions as exc
webclient = eventlet.import_patched('pyghmi.util.webclient')
import aiohmi.util.webclient as webclient
import confluent.messages as msg
import confluent.util as util
@@ -32,38 +31,38 @@ class SwitchSensor(object):
class WebClient(object):
def __init__(self, node, configmanager, creds):
self.node = node
self.wc = webclient.SecureHTTPConnection(node, port=443, verifycallback=util.TLSCertVerifier(
self.wc = webclient.WebConnection(node, port=443, verifycallback=util.TLSCertVerifier(
configmanager, node, 'pubkeys.tls_hardwaremanager').verify_cert)
self.wc.set_basic_credentials(creds[node]['secret.hardwaremanagementuser']['value'], creds[node]['secret.hardwaremanagementpassword']['value'])
def fetch(self, url, results):
async def fetch(self, url, results):
try:
rsp, status = self.wc.grab_json_response_with_status(url)
rsp, status = await self.wc.grab_json_response_with_status(url)
except exc.PubkeyInvalid:
results.put(msg.ConfluentNodeError(self.node,
results.put_nowait(msg.ConfluentNodeError(self.node,
'Mismatch detected between '
'target certificate fingerprint and '
'pubkeys.tls_hardwaremanager attribute'))
return {}
except (socket.gaierror, socket.herror, TimeoutError) as e:
results.put(msg.ConfluentTargetTimeout(self.node, str(e)))
results.put_nowait(msg.ConfluentTargetTimeout(self.node, str(e)))
return {}
except OSError as e:
if e.errno == 113:
results.put(msg.ConfluentTargetTimeout(self.node))
results.put_nowait(msg.ConfluentTargetTimeout(self.node))
else:
results.put(msg.ConfluentTargetTimeout(self.node), str(e))
results.put_nowait(msg.ConfluentTargetTimeout(self.node), str(e))
return {}
except Exception as e:
results.put(msg.ConfluentNodeError(self.node,
results.put_nowait(msg.ConfluentNodeError(self.node,
repr(e)))
return {}
if status == 401:
results.put(msg.ConfluentTargetInvalidCredentials(self.node, 'Unable to authenticate'))
results.put_nowait(msg.ConfluentTargetInvalidCredentials(self.node, 'Unable to authenticate'))
return {}
elif status != 200:
#must be str not bytes
results.put(msg.ConfluentNodeError(self.node, 'Unknown error: {} while retrieving {}'.format(rsp, url)))
results.put_nowait(msg.ConfluentNodeError(self.node, 'Unknown error: {} while retrieving {}'.format(rsp, url)))
return {}
return rsp
@@ -121,11 +120,11 @@ def _run_method(method, workers, results, configmanager, nodes, element):
creds = configmanager.get_node_attributes(
nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
for node in nodes:
workers.add(eventlet.spawn(method, configmanager, creds,
node, results, element))
workers.add(util.spawn(method(configmanager, creds,
node, results, element)))
def retrieve(nodes, element, configmanager, inputdata):
results = queue.LightQueue()
async def retrieve(nodes, element, configmanager, inputdata):
results = asyncio.Queue()
workers = set([])
if element == ['power', 'state']:
_run_method(retrieve_power, workers, results, configmanager, nodes, element)
@@ -145,69 +144,69 @@ def retrieve(nodes, element, configmanager, inputdata):
return
while workers:
try:
datum = results.get(block=True, timeout=10)
datum = await asyncio.wait_for(results.get(), 10.0)
while datum:
if datum:
yield datum
datum = results.get_nowait()
except queue.Empty:
except asyncio.QueueEmpty:
pass
eventlet.sleep(0.001)
await asyncio.sleep(0)
for t in list(workers):
if t.dead:
if t.done():
workers.discard(t)
try:
while True:
datum = results.get_nowait()
if datum:
yield datum
except queue.Empty:
except asyncio.QueueEmpty:
pass
def retrieve_inventory(configmanager, creds, node, results, element):
async def retrieve_inventory(configmanager, creds, node, results, element):
if len(element) == 3:
results.put(msg.ChildCollection('all'))
results.put(msg.ChildCollection('system'))
results.put_nowait(msg.ChildCollection('all'))
results.put_nowait(msg.ChildCollection('system'))
return
wc = WebClient(node, configmanager, creds)
invinfo = wc.fetch('/affluent/inventory/hardware/all', results)
invinfo = await wc.fetch('/affluent/inventory/hardware/all', results)
if invinfo:
results.put(msg.KeyValueData(invinfo, node))
results.put_nowait(msg.KeyValueData(invinfo, node))
def retrieve_firmware(configmanager, creds, node, results, element):
async def retrieve_firmware(configmanager, creds, node, results, element):
if len(element) == 3:
results.put(msg.ChildCollection('all'))
results.put_nowait(msg.ChildCollection('all'))
return
wc = WebClient(node, configmanager, creds)
fwinfo = wc.fetch('/affluent/inventory/firmware/all', results)
fwinfo = await wc.fetch('/affluent/inventory/firmware/all', results)
if fwinfo:
results.put(msg.Firmware(fwinfo, node))
results.put_nowait(msg.Firmware(fwinfo, node))
def list_sensors(configmanager, creds, node, results, element):
async def list_sensors(configmanager, creds, node, results, element):
wc = WebClient(node, configmanager, creds)
sensors = wc.fetch('/affluent/sensors/hardware/all', results)
sensors = await wc.fetch('/affluent/sensors/hardware/all', results)
for sensor in sensors['item']:
results.put(msg.ChildCollection(sensor))
results.put_nowait(msg.ChildCollection(sensor))
def retrieve_sensors(configmanager, creds, node, results, element):
async def retrieve_sensors(configmanager, creds, node, results, element):
wc = WebClient(node, configmanager, creds)
sensors = wc.fetch('/affluent/sensors/hardware/all/{0}'.format(element[-1]), results)
sensors = await wc.fetch('/affluent/sensors/hardware/all/{0}'.format(element[-1]), results)
if sensors:
results.put(msg.SensorReadings(sensors['sensors'], node))
results.put_nowait(msg.SensorReadings(sensors['sensors'], node))
def retrieve_power(configmanager, creds, node, results, element):
async def retrieve_power(configmanager, creds, node, results, element):
wc = WebClient(node, configmanager, creds)
hinfo = wc.fetch('/affluent/health', results)
hinfo = await wc.fetch('/affluent/health', results)
if hinfo:
results.put(msg.PowerState(node=node, state='on'))
results.put_nowait(msg.PowerState(node=node, state='on'))
def retrieve_health(configmanager, creds, node, results, element):
async def retrieve_health(configmanager, creds, node, results, element):
wc = WebClient(node, configmanager, creds)
hinfo = wc.fetch('/affluent/health', results)
hinfo = await wc.fetch('/affluent/health', results)
if hinfo:
results.put(msg.HealthSummary(hinfo.get('health', 'unknown'), name=node))
results.put(msg.SensorReadings(hinfo.get('sensors', []), name=node))
results.put_nowait(msg.HealthSummary(hinfo.get('health', 'unknown'), name=node))
results.put_nowait(msg.SensorReadings(hinfo.get('sensors', []), name=node))