2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-03 23:56:29 +00:00

Replace pyghmi with aiohmi in various plugins, remove some eventlet usage

This commit is contained in:
Jarrod Johnson
2026-01-22 14:40:44 -05:00
parent 7984c02042
commit 50e530ebde
8 changed files with 197 additions and 267 deletions

View File

@@ -44,7 +44,8 @@ import time
import struct
import traceback
webclient = eventlet.import_patched('pyghmi.util.webclient')
import aiohmi.util.webclient as webclient
mcastv4addr = '224.0.0.251'
mcastv6addr = 'ff02::fb'

View File

@@ -33,6 +33,7 @@
if __name__ == '__main__':
import sys
import confluent.config.configmanager as cfm
import asyncio
import base64
import confluent.networking.nxapi as nxapi
import confluent.exceptions as exc
@@ -41,11 +42,11 @@ import confluent.messages as msg
import confluent.snmputil as snmp
import confluent.networking.netutil as netutil
import confluent.util as util
import eventlet
from eventlet.greenpool import GreenPool
import eventlet.semaphore
import confluent.tasks as tasks
import re
webclient = eventlet.import_patched('pyghmi.util.webclient')
import aiohmi.util.webclient as webclient
# The interesting OIDs are:
# lldpLocChassisId - to cross reference (1.0.8802.1.1.2.1.3.2.0)
# lldpLocPortId - for cross referencing.. (1.0.8802.1.1.2.1.3.7.1.3)
@@ -180,35 +181,35 @@ def detect_backend(switch, verifier):
backend = _fastbackends.get(switch, None)
if backend:
return backend
wc = webclient.SecureHTTPConnection(
wc = webclient.WebConnection(
switch, 443, verifycallback=verifier, timeout=5)
apicheck, retcode = wc.grab_json_response_with_status('/affluent/')
apicheck, retcode = await wc.grab_json_response_with_status('/affluent/')
if retcode == 401 and apicheck.startswith(b'{}'):
_fastbackends[switch] = 'affluent'
else:
apicheck, retcode = wc.grab_json_response_with_status('/api/')
apicheck, retcode = await wc.grab_json_response_with_status('/api/')
if retcode == 400 and apicheck.startswith(b'{"imdata":['):
_fastbackends[switch] = 'nxapi'
return _fastbackends.get(switch, None)
def _extract_neighbor_data_https(switch, user, password, cfm, lldpdata):
async def _extract_neighbor_data_https(switch, user, password, cfm, lldpdata):
kv = util.TLSCertVerifier(cfm, switch,
'pubkeys.tls_hardwaremanager').verify_cert
backend = detect_backend(switch, kv)
backend = await detect_backend(switch, kv)
if not backend:
raise Exception("No HTTPS backend identified")
wc = webclient.SecureHTTPConnection(
wc = webclient.WebConnection(
switch, 443, verifycallback=kv, timeout=5)
if backend == 'affluent':
return _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc)
elif backend == 'nxapi':
return _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc)
return await _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc)
def _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc):
async def _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc):
cli = nxapi.NxApiClient(switch, user, password, cfm)
lldpinfo = cli.get_lldp()
lldpinfo = await cli.get_lldp()
for port in lldpinfo:
portdata = lldpinfo[port]
peerid = '{0}.{1}'.format(
@@ -221,9 +222,9 @@ def _extract_neighbor_data_nxapi(switch, user, password, cfm, lldpdata, wc):
lldpdata[port] = portdata
_neighdata[switch] = lldpdata
def _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc):
async def _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc):
wc.set_basic_credentials(user, password)
neighdata = wc.grab_json_response('/affluent/lldp/all')
neighdata = await wc.grab_json_response('/affluent/lldp/all')
chassisid = neighdata['chassis']['id']
_chassisidbyswitch[switch] = chassisid,
for record in neighdata['neighbors']:
@@ -250,7 +251,7 @@ def _extract_neighbor_data_affluent(switch, user, password, cfm, lldpdata, wc):
_neighdata[switch] = lldpdata
def _extract_neighbor_data_b(args):
async def _extract_neighbor_data_b(args):
"""Build LLDP data about elements connected to switch
args are carried as a tuple, because of eventlet convenience
@@ -268,7 +269,7 @@ def _extract_neighbor_data_b(args):
return
lldpdata = {'!!vintage': now}
try:
return _extract_neighbor_data_https(switch, user, password, cfm, lldpdata)
return await _extract_neighbor_data_https(switch, user, password, cfm, lldpdata)
except Exception as e:
pass
conn = snmp.Session(switch, password, user, privacy_protocol=privproto)
@@ -327,19 +328,19 @@ def _extract_neighbor_data_b(args):
_neighdata[switch] = lldpdata
def update_switch_data(switch, configmanager, force=False, retexc=False):
async def update_switch_data(switch, configmanager, force=False, retexc=False):
switchcreds = netutil.get_switchcreds(configmanager, (switch,))[0]
ndr = _extract_neighbor_data(switchcreds + (force, retexc))
ndr = await _extract_neighbor_data(switchcreds + (force, retexc))
if retexc and isinstance(ndr, Exception):
raise ndr
return _neighdata.get(switch, {})
def update_neighbors(configmanager, force=False, retexc=False):
return _update_neighbors_backend(configmanager, force, retexc)
async def update_neighbors(configmanager, force=False, retexc=False):
return await _update_neighbors_backend(configmanager, force, retexc)
def _update_neighbors_backend(configmanager, force, retexc):
async def _update_neighbors_backend(configmanager, force, retexc):
global _neighdata
global _neighbypeerid
vintage = _neighdata.get('!!vintage', 0)
@@ -351,23 +352,22 @@ def _update_neighbors_backend(configmanager, force, retexc):
switches = netutil.list_switches(configmanager)
switchcreds = netutil.get_switchcreds(configmanager, switches)
switchcreds = [ x + (force, retexc) for x in switchcreds]
pool = GreenPool(64)
for ans in pool.imap(_extract_neighbor_data, switchcreds):
async for ans in tasks.task_imap(_extract_neighbor_data, switchcreds, max_concurrent=64):
yield ans
def _extract_neighbor_data(args):
async def _extract_neighbor_data(args):
# single switch neighbor data update
switch = args[0]
if switch not in _updatelocks:
_updatelocks[switch] = eventlet.semaphore.Semaphore()
_updatelocks[switch] = asyncio.Semaphore()
if _updatelocks[switch].locked():
while _updatelocks[switch].locked():
eventlet.sleep(1)
await asyncio.sleep(1)
return
try:
with _updatelocks[switch]:
return _extract_neighbor_data_b(args)
async with _updatelocks[switch]:
return await _extract_neighbor_data_b(args)
except Exception as e:
yieldexc = False
if len(args) >= 7:
@@ -382,6 +382,7 @@ if __name__ == '__main__':
# (should do three argument form for snmpv3 test
import sys
_extract_neighbor_data((sys.argv[1], sys.argv[2], None, True))
asyncio.run(_extract_neighbor_data((sys.argv[1], sys.argv[2], None, True)))
print(repr(_neighdata))

View File

@@ -23,10 +23,10 @@
# - One power supply is off.
import eventlet
import eventlet.queue as queue
import asyncio
from confluent_server.confluent import tasks
import confluent.exceptions as exc
webclient = eventlet.import_patched('pyghmi.util.webclient')
import aiohmi.util.webclient as webclient
import confluent.messages as msg
import confluent.util as util
@@ -38,92 +38,39 @@ class SwitchSensor(object):
self.health = health
def cnos_login(node, configmanager, creds):
wc = webclient.SecureHTTPConnection(node, port=443, verifycallback=util.TLSCertVerifier(
async def cnos_login(node, configmanager, creds):
wc = webclient.WebConnection(node, port=443, verifycallback=util.TLSCertVerifier(
configmanager, node, 'pubkeys.tls_hardwaremanager').verify_cert)
wc.set_basic_credentials(creds[node]['secret.hardwaremanagementuser']['value'], creds[node]['secret.hardwaremanagementpassword']['value'])
wc.request('GET', '/nos/api/login/')
rsp = wc.getresponse()
body = rsp.read()
if rsp.status == 401: # CNOS gives 401 on first attempt...
wc.request('GET', '/nos/api/login/')
rsp = wc.getresponse()
body = rsp.read()
if rsp.status >= 200 and rsp.status < 300:
body, status, headers = await wc.grab_response_with_status('/nos/api/login/')
if status == 401: # CNOS gives 401 on first attempt...
body, status, headers = await wc.grab_response_with_status('/nos/api/login/')
if status >= 200 and status < 300:
return wc
raise exc.TargetEndpointBadCredentials('Unable to authenticate')
def update(nodes, element, configmanager, inputdata):
async def update(nodes, element, configmanager, inputdata):
for node in nodes:
yield msg.ConfluentNodeError(node, 'Not Implemented')
def delete(nodes, element, configmanager, inputdata):
async def delete(nodes, element, configmanager, inputdata):
for node in nodes:
yield msg.ConfluentNodeError(node, 'Not Implemented')
def create(nodes, element, configmanager, inputdata):
async def create(nodes, element, configmanager, inputdata):
for node in nodes:
yield msg.ConfluentNodeError(node, 'Not Implemented')
def retrieve(nodes, element, configmanager, inputdata):
results = queue.LightQueue()
workers = set([])
if element == ['power', 'state']:
for node in nodes:
yield msg.PowerState(node=node, state='on')
return
elif element == ['health', 'hardware']:
creds = configmanager.get_node_attributes(
nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
for node in nodes:
workers.add(eventlet.spawn(retrieve_health, configmanager, creds,
node, results))
elif element[:3] == ['inventory', 'hardware', 'all']:
creds = configmanager.get_node_attributes(
nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
for node in nodes:
workers.add(eventlet.spawn(retrieve_inventory, configmanager,
creds, node, results, element))
elif element[:3] == ['inventory', 'firmware', 'all']:
creds = configmanager.get_node_attributes(
nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
for node in nodes:
workers.add(eventlet.spawn(retrieve_firmware, configmanager,
creds, node, results, element))
else:
for node in nodes:
yield msg.ConfluentNodeError(node, 'Not Implemented')
return
currtimeout = 10
while workers:
try:
datum = results.get(10)
while datum:
if datum:
yield datum
datum = results.get_nowait()
except queue.Empty:
pass
eventlet.sleep(0.001)
for t in list(workers):
if t.dead:
workers.discard(t)
try:
while True:
datum = results.get_nowait()
if datum:
yield datum
except queue.Empty:
pass
def retrieve_inventory(configmanager, creds, node, results, element):
async def retrieve_inventory(configmanager, creds, node, results, element):
if len(element) == 3:
results.put(msg.ChildCollection('all'))
results.put(msg.ChildCollection('system'))
await results.put(msg.ChildCollection('all'))
await results.put(msg.ChildCollection('system'))
return
wc = cnos_login(node, configmanager, creds)
sysinfo = wc.grab_json_response('/nos/api/sysinfo/inventory')
wc = await cnos_login(node, configmanager, creds)
sysinfo = await wc.grab_json_response('/nos/api/sysinfo/inventory')
invinfo = {
'inventory': [{
'name': 'System',
@@ -138,39 +85,39 @@ def retrieve_inventory(configmanager, creds, node, results, element):
}
}]
}
results.put(msg.KeyValueData(invinfo, node))
await results.put(msg.KeyValueData(invinfo, node))
def retrieve_firmware(configmanager, creds, node, results, element):
async def retrieve_firmware(configmanager, creds, node, results, element):
if len(element) == 3:
results.put(msg.ChildCollection('all'))
await results.put(msg.ChildCollection('all'))
return
wc = cnos_login(node, configmanager, creds)
sysinfo = wc.grab_json_response('/nos/api/sysinfo/inventory')
wc = await cnos_login(node, configmanager, creds)
sysinfo = await wc.grab_json_response('/nos/api/sysinfo/inventory')
items = [{
'Software': {'version': sysinfo['Software Revision']},
},
{
'BIOS': {'version': sysinfo['BIOS Revision']},
}]
results.put(msg.Firmware(items, node))
await results.put(msg.Firmware(items, node))
def retrieve_health(configmanager, creds, node, results):
wc = cnos_login(node, configmanager, creds)
hinfo = wc.grab_json_response('/nos/api/sysinfo/globalhealthstatus')
async def retrieve_health(configmanager, creds, node, results):
wc = await cnos_login(node, configmanager, creds)
hinfo = await wc.grab_json_response('/nos/api/sysinfo/globalhealthstatus')
summary = hinfo['status'].lower()
if summary == 'noncritical':
summary = 'warning'
results.put(msg.HealthSummary(summary, name=node))
await results.put(msg.HealthSummary(summary, name=node))
state = None
badreadings = []
if summary != 'ok': # temperature or dump or fans or psu
wc.grab_json_response('/nos/api/sysinfo/panic_dump')
switchinfo = wc.grab_json_response('/nos/api/sysinfo/panic_dump')
await wc.grab_json_response('/nos/api/sysinfo/panic_dump')
switchinfo = await wc.grab_json_response('/nos/api/sysinfo/panic_dump')
if switchinfo:
badreadings.append(
SwitchSensor('Panicdump', ['Present'], health='warning'))
switchinfo = wc.grab_json_response('/nos/api/sysinfo/temperatures')
switchinfo = await wc.grab_json_response('/nos/api/sysinfo/temperatures')
for temp in switchinfo:
if temp == 'Temperature threshold':
continue
@@ -181,17 +128,68 @@ def retrieve_health(configmanager, creds, node, results):
tempval = switchinfo[temp]['Temp']
badreadings.append(
SwitchSensor(temp, [], value=tempval, health=temphealth))
switchinfo = wc.grab_json_response('/nos/api/sysinfo/fans')
switchinfo = await wc.grab_json_response('/nos/api/sysinfo/fans')
for fan in switchinfo:
if switchinfo[fan]['speed-rpm'] < 100:
badreadings.append(
SwitchSensor(fan, [], value=switchinfo[fan]['speed-rpm'],
health='critical'))
switchinfo = wc.grab_json_response('/nos/api/sysinfo/power')
switchinfo = await wc.grab_json_response('/nos/api/sysinfo/power')
for psu in switchinfo:
if switchinfo[psu]['State'] != 'Normal ON':
psuname = switchinfo[psu]['Name']
badreadings.append(
SwitchSensor(psuname, states=[switchinfo[psu]['State']],
health='critical'))
results.put(msg.SensorReadings(badreadings, name=node))
await results.put(msg.SensorReadings(badreadings, name=node))
async def retrieve(nodes, element, configmanager, inputdata):
results = asyncio.Queue()
workers = set([])
if element == ['power', 'state']:
for node in nodes:
yield msg.PowerState(node=node, state='on')
return
elif element == ['health', 'hardware']:
creds = configmanager.get_node_attributes(
nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
for node in nodes:
workers.add(tasks.spawn(retrieve_health(configmanager, creds,
node, results))
elif element[:3] == ['inventory', 'hardware', 'all']:
creds = configmanager.get_node_attributes(
nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
for node in nodes:
workers.add(tasks.spawn(retrieve_inventory(configmanager,
creds, node, results, element)))
elif element[:3] == ['inventory', 'firmware', 'all']:
creds = configmanager.get_node_attributes(
nodes, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
for node in nodes:
workers.add(tasks.spawn(retrieve_firmware(configmanager,
creds, node, results, element)))
else:
for node in nodes:
yield msg.ConfluentNodeError(node, 'Not Implemented')
return
currtimeout = 10
while workers:
try:
datum = await asyncio.wait_for(results.get(), timeout=10)
while datum:
if datum:
yield datum
datum = results.get_nowait()
except asyncio.QueueEmpty:
pass
await asyncio.sleep(0.001)
for t in list(workers):
if t.done():
workers.discard(t)
try:
while True:
datum = results.get_nowait()
if datum:
yield datum
except asyncio.QueueEmpty:
pass

View File

@@ -16,17 +16,9 @@ from xml.etree.ElementTree import fromstring as rfromstring
import confluent.util as util
import confluent.messages as msg
import confluent.exceptions as exc
import eventlet.green.time as time
import eventlet.green.socket as socket
import eventlet.greenpool as greenpool
import eventlet
wc = eventlet.import_patched('pyghmi.util.webclient')
try:
import Cookie
httplib = eventlet.import_patched('httplib')
except ImportError:
httplib = eventlet.import_patched('http.client')
import http.cookies as Cookie
import confluent.tasks as tasks
import aiohmi.util.webclient as wc
import time
sensorsbymodel = {
'FS1350': ['alarms', 'dt', 'duty', 'dw', 'mode', 'p3state', 'primflow', 'ps1', 'ps1a', 'ps1b', 'ps2', 'ps3', 'ps4', 'ps5a', 'ps5b', 'ps5c', 'pumpspeed1', 'pumpspeed2', 'pumpspeed3', 'rh', 'sdp', 'secflow', 'setpoint', 't1', 't2', 't2a', 't2b', 't2c', 't3', 't3', 't4', 't5', 'valve', 'valve2'],
@@ -104,81 +96,6 @@ def simplify_name(name):
return name.lower().replace(' ', '_').replace('/', '-').replace(
'_-_', '-')
pdupool = greenpool.GreenPool(128)
class WebResponse(httplib.HTTPResponse):
def _check_close(self):
return True
class WebConnection(wc.SecureHTTPConnection):
response_class = WebResponse
def __init__(self, host, secure, verifycallback):
if secure:
port = 443
else:
port = 80
wc.SecureHTTPConnection.__init__(self, host, port, verifycallback=verifycallback)
self.secure = secure
self.cookies = {}
def connect(self):
if self.secure:
return super(WebConnection, self).connect()
addrinfo = socket.getaddrinfo(self.host, self.port)[0]
# workaround problems of too large mtu, moderately frequent occurance
# in this space
plainsock = socket.socket(addrinfo[0])
plainsock.settimeout(self.mytimeout)
try:
plainsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_MAXSEG, 1456)
except socket.error:
pass
plainsock.connect(addrinfo[4])
self.sock = plainsock
def getresponse(self):
try:
rsp = super(WebConnection, self).getresponse()
try:
hdrs = [x.split(':', 1) for x in rsp.msg.headers]
except AttributeError:
hdrs = rsp.msg.items()
for hdr in hdrs:
if hdr[0] == 'Set-Cookie':
c = Cookie.BaseCookie(hdr[1])
for k in c:
self.cookies[k] = c[k].value
except httplib.BadStatusLine:
self.broken = True
raise
return rsp
def request(self, method, url, body=None):
headers = {}
if body:
headers['Content-Length'] = len(body)
cookies = []
for cookie in self.cookies:
cookies.append('{0}={1}'.format(cookie, self.cookies[cookie]))
headers['Cookie'] = ';'.join(cookies)
headers['Host'] = 'pdu.cluster.net'
headers['Accept'] = '*/*'
headers['Accept-Language'] = 'en-US,en;q=0.9'
headers['Connection'] = 'close'
headers['Referer'] = 'http://pdu.cluster.net/setting_admin.htm'
return super(WebConnection, self).request(method, url, body, headers)
def grab_response(self, url, body=None, method=None):
if method is None:
method = 'GET' if body is None else 'POST'
if body:
self.request(method, url, body)
else:
self.request(method, url)
rsp = self.getresponse()
body = rsp.read()
return body, rsp.status
class CoolteraClient(object):
def __init__(self, cdu, configmanager):
self.node = cdu
@@ -201,7 +118,7 @@ class CoolteraClient(object):
cv = util.TLSCertVerifier(
self.configmanager, self.node,
'pubkeys.tls_hardwaremanager').verify_cert
self._wc = WebConnection(target, secure=True, verifycallback=cv)
self._wc = wc.WebConnection(target, 443, verifycallback=cv)
return self._wc
@@ -231,14 +148,12 @@ def xml2stateinfo(statdata):
_sensors_by_node = {}
def read_sensors(element, node, configmanager):
async def read_sensors(element, node, configmanager):
category, name = element[-2:]
justnames = False
if len(element) == 3:
# just get names
category = name
name = 'all'
justnames = True
for sensor in sensors:
yield msg.ChildCollection(simplify_name(sensors[sensor][0]))
return
@@ -247,9 +162,7 @@ def read_sensors(element, node, configmanager):
sn = _sensors_by_node.get(node, None)
if not sn or sn[1] < time.time():
cc = CoolteraClient(node, configmanager)
cc.wc.request('GET', '/status.xml')
rsp = cc.wc.getresponse()
statdata = rsp.read()
statdata, status, hdrs = await cc.wc.grab_response_with_status('/status.xml')
statinfo = xml2stateinfo(statdata)
_sensors_by_node[node] = (statinfo, time.time() + 1)
sn = _sensors_by_node.get(node, None)
@@ -257,12 +170,13 @@ def read_sensors(element, node, configmanager):
yield msg.SensorReadings(sn[0], name=node)
def retrieve(nodes, element, configmanager, inputdata):
async def retrieve(nodes, element, configmanager, inputdata):
if element[0] == 'sensors':
gp = greenpool.GreenPile(pdupool)
taskargs = []
for node in nodes:
gp.spawn(read_sensors, element, node, configmanager)
for rsp in gp:
taskargs.append((element, node, configmanager))
gp = tasks.starmap(read_sensors, taskargs)
async for rsp in gp:
for datum in rsp:
yield datum
return

View File

@@ -13,7 +13,7 @@
# limitations under the License.
import confluent.core as core
import confluent.messages as msg
import pyghmi.exceptions as pygexc
import aiohmi.exceptions as pygexc
import confluent.exceptions as exc

View File

@@ -25,7 +25,6 @@ import re
import eventlet
import eventlet.queue as queue
import confluent.exceptions as exc
webclient = eventlet.import_patched("pyghmi.util.webclient")
import confluent.messages as msg
import confluent.util as util
import confluent.plugins.shell.ssh as ssh

View File

@@ -1,18 +1,12 @@
import asyncio
import confluent.vinzmanager as vinzmanager
import codecs
import confluent.util as util
import confluent.messages as msg
import eventlet
import json
import struct
webclient = eventlet.import_patched('pyghmi.util.webclient')
import eventlet.green.socket as socket
import eventlet
import aiohmi.util.webclient as webclient
import confluent.interface.console as conapi
import io
import urllib.parse as urlparse
import eventlet.green.ssl as ssl
import eventlet
@@ -211,20 +205,20 @@ class PmxApiClient:
except Exception:
pass
self.server = server
self.wc = webclient.SecureHTTPConnection(server, port=8006, verifycallback=cv)
self.wc = webclient.WebConnection(server, port=8006, verifycallback=cv)
self.fprint = configmanager.get_node_attributes(server, 'pubkeys.tls').get(server, {}).get('pubkeys.tls', {}).get('value', None)
self.vmmap = {}
self.login()
self.vmlist = {}
self.vmbyid = {}
def login(self):
async def login(self):
loginform = {
'username': self.user,
'password': self.password,
}
loginbody = urlparse.urlencode(loginform)
rsp = self.wc.grab_json_response_with_status('/api2/json/access/ticket', loginbody)
rsp = await self.wc.grab_json_response_with_status('/api2/json/access/ticket', loginbody)
self.wc.cookies['PVEAuthCookie'] = rsp[0]['data']['ticket']
self.pac = rsp[0]['data']['ticket']
self.wc.set_header('CSRFPreventionToken', rsp[0]['data']['CSRFPreventionToken'])
@@ -233,23 +227,23 @@ class PmxApiClient:
def get_screenshot(self, vm, outfile):
raise Exception("Not implemented")
def map_vms(self):
rsp = self.wc.grab_json_response('/api2/json/cluster/resources')
async def map_vms(self):
rsp = await self.wc.grab_json_response('/api2/json/cluster/resources')
for datum in rsp.get('data', []):
if datum['type'] == 'qemu':
self.vmmap[datum['name']] = (datum['node'], datum['id'])
return self.vmmap
def get_vm(self, vm):
async def get_vm(self, vm):
if vm not in self.vmmap:
self.map_vms()
await self.map_vms()
return self.vmmap[vm]
def get_vm_inventory(self, vm):
host, guest = self.get_vm(vm)
cfg = self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending')
async def get_vm_inventory(self, vm):
host, guest = await self.get_vm(vm)
cfg = await self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending')
myuuid = None
sysinfo = {'name': 'System', 'present': True, 'information': {
'Product name': 'Proxmox qemu virtual machine',
@@ -281,15 +275,15 @@ class PmxApiClient:
yield msg.KeyValueData({'inventory': invitems}, vm)
def get_vm_ikvm(self, vm):
return self.get_vm_consproxy(vm, 'vnc')
async def get_vm_ikvm(self, vm):
return await self.get_vm_consproxy(vm, 'vnc')
def get_vm_serial(self, vm):
return self.get_vm_consproxy(vm, 'term')
async def get_vm_serial(self, vm):
return await self.get_vm_consproxy(vm, 'term')
def get_vm_consproxy(self, vm, constype):
host, guest = self.get_vm(vm)
rsp = self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/{constype}proxy', method='POST')
async def get_vm_consproxy(self, vm, constype):
host, guest = await self.get_vm(vm)
rsp = await self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/{constype}proxy', method='POST')
consdata = rsp[0]['data']
consdata['server'] = self.server
consdata['host'] = host
@@ -297,9 +291,9 @@ class PmxApiClient:
consdata['pac'] = self.pac
return consdata
def get_vm_bootdev(self, vm):
host, guest = self.get_vm(vm)
cfg = self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending')
async def get_vm_bootdev(self, vm):
host, guest = await self.get_vm(vm)
cfg = await self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending')
for datum in cfg['data']:
if datum['key'] == 'boot':
bootseq = datum.get('pending', datum['value'])
@@ -312,9 +306,9 @@ class PmxApiClient:
return 'default'
def get_vm_power(self, vm):
host, guest = self.get_vm(vm)
rsp = self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/status/current')
async def get_vm_power(self, vm):
host, guest = await self.get_vm(vm)
rsp = await self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/status/current')
rsp = rsp['data']
currstatus = rsp["qmpstatus"] # stopped, "running"
if currstatus == 'running':
@@ -323,15 +317,15 @@ class PmxApiClient:
return 'off'
raise Exception("Unknnown response to status query")
def set_vm_power(self, vm, state):
host, guest = self.get_vm(vm)
async def set_vm_power(self, vm, state):
host, guest = await self.get_vm(vm)
current = None
newstate = ''
targstate = state
if targstate == 'boot':
targstate = 'on'
if state == 'boot':
current = self.get_vm_power(vm)
current = await self.get_vm_power(vm)
if current == 'on':
state = 'reset'
newstate = 'reset'
@@ -342,27 +336,27 @@ class PmxApiClient:
elif state == 'off':
state = 'stop'
if state == 'reset': # check for pending config
cfg = self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending')
cfg = await self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending')
for datum in cfg['data']:
if datum['key'] == 'boot' and 'pending' in datum:
self.set_vm_power(vm, 'off')
self.set_vm_power(vm, 'on')
await self.set_vm_power(vm, 'off')
await self.set_vm_power(vm, 'on')
state = ''
newstate = 'reset'
if state:
rsp = self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/status/{state}', method='POST')
rsp = await self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/status/{state}', method='POST')
if state and state != 'reset':
newstate = self.get_vm_power(vm)
newstate = await self.get_vm_power(vm)
while newstate != targstate:
eventlet.sleep(0.1)
newstate = self.get_vm_power(vm)
await asyncio.sleep(0.1)
newstate = await self.get_vm_power(vm)
return newstate, current
def set_vm_bootdev(self, vm, bootdev):
host, guest = self.get_vm(vm)
async def set_vm_bootdev(self, vm, bootdev):
host, guest = await self.get_vm(vm)
if bootdev not in ('net', 'network', 'default'):
raise Exception('Requested boot device not supported')
cfg = self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending')
cfg = await self.wc.grab_json_response(f'/api2/json/nodes/{host}/{guest}/pending')
nonnetdevs = []
netdevs = []
for datum in cfg['data']:
@@ -383,7 +377,7 @@ class PmxApiClient:
neworder = 'order=' + ';'.join(newbootdevs)
self.wc.set_header('Content-Type', 'application/json')
try:
self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/config', {'boot': neworder}, method='PUT')
await self.wc.grab_json_response_with_status(f'/api2/json/nodes/{host}/{guest}/config', {'boot': neworder}, method='PUT')
finally:
del self.wc.stdheaders['Content-Type']
@@ -420,16 +414,16 @@ def retrieve(nodes, element, configmanager, inputdata):
# good background for the webui, and kitty
yield msg.ConfluentNodeError(node, "vnc available, screenshot not available")
def update(nodes, element, configmanager, inputdata):
async def update(nodes, element, configmanager, inputdata):
clientsbynode = prep_proxmox_clients(nodes, configmanager)
for node in nodes:
currclient = clientsbynode[node]
if element == ['power', 'state']:
newstate, oldstate = currclient.set_vm_power(node, inputdata.powerstate(node))
newstate, oldstate = await currclient.set_vm_power(node, inputdata.powerstate(node))
yield msg.PowerState(node, newstate, oldstate)
elif element == ['boot', 'nextdevice']:
currclient.set_vm_bootdev(node, inputdata.bootdevice(node))
yield msg.BootDevice(node, currclient.get_vm_bootdev(node))
await currclient.set_vm_bootdev(node, inputdata.bootdevice(node))
yield msg.BootDevice(node, await currclient.get_vm_bootdev(node))
elif element == ['console', 'ikvm']:
try:
currclient = clientsbynode[node]
@@ -441,7 +435,7 @@ def update(nodes, element, configmanager, inputdata):
return
# assume this is only console for now
def create(nodes, element, configmanager, inputdata):
async def create(nodes, element, configmanager, inputdata):
clientsbynode = prep_proxmox_clients(nodes, configmanager)
for node in nodes:
if element == ['console', 'ikvm']:
@@ -453,7 +447,7 @@ def create(nodes, element, configmanager, inputdata):
return
yield msg.ChildCollection(url)
return
serialdata = clientsbynode[node].get_vm_serial(node)
serialdata = await clientsbynode[node].get_vm_serial(node)
yield PmxConsole(serialdata, node, configmanager, clientsbynode[node])
return

View File

@@ -26,6 +26,28 @@ tsks = {}
tasksitter = None
logtrace = None
async def task_starmap(coro, iterable, max_concurrent=256):
semaphore = asyncio.Semaphore(max_concurrent)
async def sem_coro(*args):
async with semaphore:
return await coro(*args)
tasks = [asyncio.create_task(sem_coro(*item)) for item in iterable]
for task in asyncio.as_completed(tasks):
yield await task
async def task_imap(coro, iterable, max_concurrent=256):
semaphore = asyncio.Semaphore(max_concurrent)
async def sem_coro(item):
async with semaphore:
return await coro(item)
tasks = [asyncio.create_task(sem_coro(item)) for item in iterable]
for task in asyncio.as_completed(tasks):
yield await task
async def _sit_tasks():
while True:
while not tsks:
@@ -60,6 +82,7 @@ def spawn(coro):
while tskid in tsks:
tskid = random.random()
tsks[tskid] = spawn_task(coro)
return tsks[tskid]
async def _sleep_and_run(sleeptime, func, args):