2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-09 10:21:39 +00:00

Leverage bulk facility

Study of the web interface showed that bulk requests are a key component.

This takes a sensor sweep from about 3 minutes to about 7 seconds in a test environment.
This commit is contained in:
Jarrod Johnson
2026-04-06 10:09:10 -04:00
parent 7bbc451047
commit c53206331c

View File

@@ -122,6 +122,37 @@ class RaritanClient(object):
self._authheader = {'Authorization': 'Basic ' + cred}
return self._authheader
async def bulk_jsonrpc(self, ump):
requests = []
reqmeth = 'performBulk'
reqpath = '/bulk'
reqparms = {'requests': requests}
uribyid = {}
for req in ump:
reqid = self._next_id()
uri, method, params = req
uribyid[reqid] = uri
currreq = {
'rid': uri,
'json': {
'jsonrpc': '2.0',
'method': method,
'id': reqid,
},
}
if params is not None:
currreq['json']['params'] = params
requests.append(currreq)
rsps = await self.jsonrpc(reqpath, reqmeth, reqparms)
rsps = rsps.get('responses', [])
result_by_uri = {}
for rsp in rsps:
uri = uribyid.get(rsp['json']['id'], None)
if uri:
result_by_uri[uri] = rsp['json']['result']
return result_by_uri
async def jsonrpc(self, uri, method, params=None):
reqdata = {
'jsonrpc': '2.0',
@@ -237,47 +268,44 @@ async def _collect_sensor_readings(rc, name, category):
pass
# Outlet sensors
num_outlets = await rc.get_num_outlets()
outlet_by_url = {}
bulkreqs = []
for idx in range(num_outlets):
try:
outlet_meta = await rc.get_outlet_metadata(idx)
outlet_label = outlet_meta.get('label', 'Outlet {}'.format(idx + 1))
except Exception:
outlet_by_url['/model/pdu/0/outlet/{0}'.format(idx)] = idx
bulkreqs.append(('/model/pdu/0/outlet/{0}'.format(idx), 'getSensors', None))
rsps = await rc.bulk_jsonrpc(bulkreqs)
bulkreqs = []
read_requests = []
infobyrid = {}
for url in rsps:
idx = outlet_by_url.get(url, None)
if idx is not None:
outlet_label = 'Outlet {}'.format(idx + 1)
try:
outlet_sensors = await rc.get_outlet_sensors(idx)
except Exception:
continue
for stype, sref in outlet_sensors.items():
if sref is None:
continue
info = _sensor_type_map.get(stype, None)
if not info:
continue
readtype, units, cat = info
if category not in ('all', cat):
continue
myname = outlet_label + ' ' + readtype
if name != 'all' and simplify_name(myname) != name:
continue
sensor_rid = (
sref.get('rid', None) if isinstance(sref, dict) else None
)
if not sensor_rid:
continue
try:
result = await rc.jsonrpc(sensor_rid, 'getReading')
reading = result.get('_ret_', {})
readings.append({
'name': myname,
'value': float(reading.get('value', 0)),
'units': units,
'type': readtype.split()[-1],
})
except Exception:
pass
outlet_sensors = rsps[url].get('_ret_', {})
for stype, sref in outlet_sensors.items():
if sref is None:
continue
info = _sensor_type_map.get(stype, None)
if not info:
continue
readtype, units, cat = info
if category not in ('all', cat):
continue
myname = outlet_label + ' ' + readtype
if name != 'all' and simplify_name(myname) != name:
continue
sensor_rid = (
sref.get('rid', None) if isinstance(sref, dict) else None
)
if not sensor_rid:
continue
infobyrid[sensor_rid] = (myname, units, readtype)
read_requests.append((sensor_rid, 'getReading', None))
# Peripheral device sensors (temperature, humidity, etc.)
try:
slots = await rc.get_peripheral_slots()
bulksettingsrequests = []
bulkdevicerequests = []
for slotref in slots:
slot_rid = (
slotref.get('rid', None)
@@ -285,13 +313,27 @@ async def _collect_sensor_readings(rc, name, category):
)
if not slot_rid:
continue
try:
settings = await rc.jsonrpc(slot_rid, 'getSettings')
settings = settings.get('_ret_', {})
devinfo = await rc.jsonrpc(slot_rid, 'getDevice')
devinfo = devinfo.get('_ret_', {})
except Exception:
bulksettingsrequests.append((slot_rid, 'getSettings', None))
bulkdevicerequests.append((slot_rid, 'getDevice', None))
bulksettings = await rc.bulk_jsonrpc(bulksettingsrequests)
bulkdevices = {}
for setting in bulksettings.values():
setting = setting.get('_ret_', {})
if setting.get('name', '') != '':
break
else:
bulkdevicerequests = []
if bulkdevicerequests:
bulkdevices = await rc.bulk_jsonrpc(bulkdevicerequests)
for slotref in slots:
slot_rid = (
slotref.get('rid', None)
if isinstance(slotref, dict) else None
)
if not slot_rid:
continue
settings = bulksettings.get(slot_rid, {}).get('_ret_', {})
devinfo = bulkdevices.get(slot_rid, {}).get('_ret_', {})
if not devinfo or devinfo.get('device', None) is None:
continue
sensor_name = settings.get('name', 'Peripheral Sensor')
@@ -315,19 +357,20 @@ async def _collect_sensor_readings(rc, name, category):
myname = sensor_name + ' ' + rname
if name != 'all' and simplify_name(myname) != name:
continue
try:
result = await rc.jsonrpc(dev_rid, 'getReading')
reading = result.get('_ret_', {})
readings.append({
'name': myname,
'value': float(reading.get('value', 0)),
'units': units,
'type': rname.split()[-1],
})
except Exception:
pass
infobyrid[dev_rid] = (myname, units, rname)
read_requests.append((dev_rid, 'getReading', None))
except Exception:
pass
rsps = await rc.bulk_jsonrpc(read_requests)
for sensor_rid, result in rsps.items():
reading = result.get('_ret_', {})
myname, units, readtype = infobyrid.get(sensor_rid, ('Unknown', '', ''))
readings.append({
'name': myname,
'value': float(reading.get('value', 0)),
'units': units,
'type': readtype.split()[-1],
})
return readings