2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-24 19:29:28 +00:00

Have slp mostly work

Advance the SLP discovery code and core discovery
to mostly work.
This commit is contained in:
Jarrod Johnson
2024-05-07 17:02:51 -04:00
parent 2089f5e7e6
commit 23658680a5
9 changed files with 285 additions and 236 deletions

View File

@@ -61,6 +61,7 @@
# retry until uppercase, lowercase, digit, and symbol all present)
# - Apply defined configuration to endpoint
import asyncio
import base64
import confluent.config.configmanager as cfm
import confluent.collective.manager as collective
@@ -80,20 +81,21 @@ import confluent.messages as msg
import confluent.networking.macmap as macmap
import confluent.noderange as noderange
import confluent.util as util
import inspect
import json
import eventlet
import traceback
import shlex
import struct
import eventlet.green.socket as socket
#import eventlet.green.socket as socket
import socket
import socket as nsocket
import eventlet.green.subprocess as subprocess
webclient = eventlet.import_patched('pyghmi.util.webclient')
import subprocess
#import eventlet.green.subprocess as subprocess
#webclient = eventlet.import_patched('pyghmi.util.webclient')
import eventlet
import eventlet.greenpool
import eventlet.semaphore
autosensors = set()
scanner = None
@@ -153,7 +155,6 @@ servicebyname = {
'lenovo-tsm': 'service:lenovo-tsm',
}
discopool = eventlet.greenpool.GreenPool(500)
runningevals = {}
# Passive-only auto-detection protocols:
# PXE
@@ -512,8 +513,8 @@ def save_subscriptions(subs):
dso.write(json.dumps(subs))
def register_remote_addrs(addresses, configmanager):
def register_remote_addr(addr):
async def register_remote_addrs(addresses, configmanager):
async def register_remote_addr(addr):
nd = {
'addresses': [(addr, 443)]
}
@@ -522,12 +523,13 @@ def register_remote_addrs(addresses, configmanager):
return addr, False
sd['hwaddr'] = sd['attributes']['mac-address']
nh = xcc.NodeHandler(sd, configmanager)
nh.scan()
detected(nh.info)
await nh.scan()
await detected(nh.info)
return addr, True
rpool = eventlet.greenpool.GreenPool(512)
#rpool = eventlet.greenpool.GreenPool(512)
for count in iterate_addrs(addresses, True):
yield msg.ConfluentResourceCount(count)
return # ASYNC
for result in rpool.imap(register_remote_addr, iterate_addrs(addresses)):
if result[1]:
yield msg.CreatedResource(result[0])
@@ -535,7 +537,7 @@ def register_remote_addrs(addresses, configmanager):
yield msg.ConfluentResourceNotFound(result[0])
def handle_api_request(configmanager, inputdata, operation, pathcomponents):
async def handle_api_request(configmanager, inputdata, operation, pathcomponents):
if pathcomponents == ['discovery', 'autosense']:
return handle_autosense_config(operation, inputdata)
if operation == 'retrieve' and pathcomponents[:2] == ['discovery', 'subscriptions']:
@@ -549,7 +551,7 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents):
pathcomponents == ['discovery', 'rescan']):
if inputdata != {'rescan': 'start'}:
raise exc.InvalidArgumentException()
rescan()
await rescan()
return (msg.KeyValueData({'rescan': 'started'}),)
elif operation in ('update', 'create') and pathcomponents[:2] == ['discovery', 'subscriptions']:
target = pathcomponents[2]
@@ -570,7 +572,7 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents):
if pathcomponents == ['discovery', 'register']:
if 'addresses' not in inputdata:
raise exc.InvalidArgumentException('Missing address in input')
return register_remote_addrs(inputdata['addresses'], configmanager)
return await register_remote_addrs(inputdata['addresses'], configmanager)
if 'node' not in inputdata:
raise exc.InvalidArgumentException('Missing node name in input')
mac = _get_mac_from_query(pathcomponents)
@@ -581,7 +583,7 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents):
'/'.join(pathcomponents)))
handler = info['handler'].NodeHandler(info, configmanager)
try:
eval_node(configmanager, handler, info, inputdata['node'],
await eval_node(configmanager, handler, info, inputdata['node'],
manual=True)
except Exception as e:
# or... incorrect passworod provided..
@@ -654,10 +656,10 @@ async def _recheck_nodes(nodeattribs, configmanager):
# if already in progress, don't run again
# it may make sense to schedule a repeat, but will try the easier and less redundant way first
return
with rechecklock:
return _recheck_nodes_backend(nodeattribs, configmanager)
async with rechecklock:
return await _recheck_nodes_backend(nodeattribs, configmanager)
def _recheck_nodes_backend(nodeattribs, configmanager):
async def _recheck_nodes_backend(nodeattribs, configmanager):
global rechecker
_map_unique_ids(nodeattribs)
# for the nodes whose attributes have changed, consider them as potential
@@ -673,7 +675,7 @@ def _recheck_nodes_backend(nodeattribs, configmanager):
# Now we go through ones we did not find earlier
for mac in list(unknown_info):
try:
_recheck_single_unknown(configmanager, mac)
await _recheck_single_unknown(configmanager, mac)
except Exception:
traceback.print_exc()
continue
@@ -685,19 +687,19 @@ def _recheck_nodes_backend(nodeattribs, configmanager):
if info['handler'] is None:
next
handler = info['handler'].NodeHandler(info, configmanager)
discopool.spawn_n(eval_node, configmanager, handler, info, nodename)
util.spawn(eval_node(configmanager, handler, info, nodename))
except Exception:
traceback.print_exc()
log.log({'error': 'Unexpected error during discovery of {0}, check debug '
'logs'.format(nodename)})
def _recheck_single_unknown(configmanager, mac):
async def _recheck_single_unknown(configmanager, mac):
info = unknown_info.get(mac, None)
_recheck_single_unknown_info(configmanager, info)
await _recheck_single_unknown_info(configmanager, info)
def _recheck_single_unknown_info(configmanager, info):
async def _recheck_single_unknown_info(configmanager, info):
global rechecker
global rechecktime
if not info or info['handler'] is None:
@@ -728,12 +730,12 @@ def _recheck_single_unknown_info(configmanager, info):
if rechecker is not None and rechecktime > util.monotonic_time() + 300:
rechecker.cancel()
# if cancel did not result in dead, then we are in progress
if rechecker is None or rechecker.dead:
if rechecker is None or rechecker.done():
rechecktime = util.monotonic_time() + 300
rechecker = util.spawn_after(300, _periodic_recheck,
configmanager)
return
nodename, info['maccount'] = get_nodename(configmanager, handler, info)
nodename, info['maccount'] = await get_nodename(configmanager, handler, info)
if nodename:
if handler.https_supported:
dp = configmanager.get_node_attributes([nodename],
@@ -745,27 +747,30 @@ def _recheck_single_unknown_info(configmanager, info):
known_nodes[nodename][info['hwaddr']] = info
info['discostatus'] = 'discovered'
return # already known, no need for more
discopool.spawn_n(eval_node, configmanager, handler, info, nodename)
util.spawn(eval_node(configmanager, handler, info, nodename))
def safe_detected(info):
print(repr(info['services']))
if 'hwaddr' not in info or not info['hwaddr']:
print("No mac!!!")
return
if info['hwaddr'] in runningevals:
print("Already mac!!!")
# Do not evaluate the same mac multiple times at once
return
runningevals[info['hwaddr']] = discopool.spawn(eval_detected, info)
runningevals[info['hwaddr']] = util.spawn(eval_detected(info))
def eval_detected(info):
async def eval_detected(info):
try:
detected(info)
await detected(info)
except Exception as e:
traceback.print_exc()
del runningevals[info['hwaddr']]
def detected(info):
async def detected(info):
global rechecker
global rechecktime
if not cfm.config_is_ready():
@@ -783,7 +788,7 @@ def detected(info):
return
if (handler and not handler.NodeHandler.adequate(info) and
info.get('protocol', None)):
eventlet.spawn_after(10, info['protocol'].fix_info, info,
util.spawn_after(10, info['protocol'].fix_info, info,
safe_detected)
return
if info['hwaddr'] in known_info and 'addresses' in info:
@@ -816,7 +821,9 @@ def detected(info):
cfg = cfm.ConfigManager(None)
if handler:
handler = handler.NodeHandler(info, cfg)
handler.scan()
res = handler.scan()
if inspect.isawaitable(res):
await res
try:
if 'modelnumber' not in info:
info['modelnumber'] = info['attributes']['enclosure-machinetype-model'][0]
@@ -858,7 +865,7 @@ def detected(info):
)})
if rechecker is not None and rechecktime > util.monotonic_time() + 300:
rechecker.cancel()
if rechecker is None or rechecker.dead:
if rechecker is None or rechecker.done():
rechecktime = util.monotonic_time() + 300
rechecker = util.spawn_after(300, _periodic_recheck, cfg)
unknown_info[info['hwaddr']] = info
@@ -866,7 +873,7 @@ def detected(info):
#TODO, eventlet spawn after to recheck sooner, or somehow else
# influence periodic recheck to shorten delay?
return
nodename, info['maccount'] = get_nodename(cfg, handler, info)
nodename, info['maccount'] = await get_nodename(cfg, handler, info)
if nodename and handler and handler.https_supported:
dp = cfg.get_node_attributes([nodename],
('pubkeys.tls_hardwaremanager', 'id.uuid', 'discovery.policy'))
@@ -893,7 +900,7 @@ def detected(info):
#for now defer probe until inside eval_node. We might not have
#a nodename without probe in the future.
if nodename and handler:
eval_node(cfg, handler, info, nodename)
await eval_node(cfg, handler, info, nodename)
elif handler:
#log.log(
# {'info': 'Detected unknown {0} with hwaddr {1} at '
@@ -1038,7 +1045,7 @@ def get_nodename_sysdisco(cfg, handler, info):
return nl[0]
def get_nodename(cfg, handler, info):
async def get_nodename(cfg, handler, info):
nodename = None
maccount = None
info['verified'] = False
@@ -1080,7 +1087,7 @@ def get_nodename(cfg, handler, info):
if not nodename: # as a last resort, search switches for info
# This is the slowest potential operation, so we hope for the
# best to occur prior to this
nodename, macinfo = macmap.find_nodeinfo_by_mac(info['hwaddr'], cfg)
nodename, macinfo = await macmap.find_nodeinfo_by_mac(info['hwaddr'], cfg)
maccount = macinfo['maccount']
if nodename:
if handler.devname == 'SMM':
@@ -1198,7 +1205,7 @@ def search_smms_by_cert(currsmm, cert, cfg):
return search_smms_by_cert(exnl[0], cert, cfg)
def eval_node(cfg, handler, info, nodename, manual=False):
async def eval_node(cfg, handler, info, nodename, manual=False):
try:
handler.probe() # unicast interrogation as possible to get more data
# switch concurrently
@@ -1237,7 +1244,7 @@ def eval_node(cfg, handler, info, nodename, manual=False):
info['verfied'] = True
info['enclosure.bay'] = match[1]
if match[2]:
if not discover_node(cfg, handler, info, match[2], manual):
if not await discover_node(cfg, handler, info, match[2], manual):
pending_nodes[match[2]] = info
return
if 'enclosure.bay' not in info:
@@ -1304,7 +1311,7 @@ def eval_node(cfg, handler, info, nodename, manual=False):
info['discostatus'] = 'unidentified'
return
nodename = nl[0]
if not discover_node(cfg, handler, info, nodename, manual):
if not await discover_node(cfg, handler, info, nodename, manual):
# store it as pending, assuming blocked on enclosure
# assurance...
pending_nodes[nodename] = info
@@ -1324,7 +1331,7 @@ def eval_node(cfg, handler, info, nodename, manual=False):
fprints = macmap.get_node_fingerprints(nodename, cfg)
for fprint in fprints:
if util.cert_matches(fprint[0], handler.https_cert):
if not discover_node(cfg, handler, info,
if not await discover_node(cfg, handler, info,
nodename, manual):
pending_nodes[nodename] = info
return
@@ -1336,11 +1343,11 @@ def eval_node(cfg, handler, info, nodename, manual=False):
'switch.'.format(nodename, handler.devname)
log.log({'error': errorstr})
return
if not discover_node(cfg, handler, info, nodename, manual):
if not await discover_node(cfg, handler, info, nodename, manual):
pending_nodes[nodename] = info
def discover_node(cfg, handler, info, nodename, manual):
async def discover_node(cfg, handler, info, nodename, manual):
if manual:
if not cfg.is_node(nodename):
raise exc.InvalidArgumentException(
@@ -1449,15 +1456,14 @@ def discover_node(cfg, handler, info, nodename, manual):
log.log({'error': 'Unable to get BMC address for {0]'.format(nodename)})
else:
bmcaddr = bmcaddr.split('/', 1)[0]
wait_for_connection(bmcaddr)
socket.getaddrinfo(bmcaddr, 443)
await wait_for_connection(bmcaddr)
subprocess.check_call(['/opt/confluent/bin/nodeconfig', nodename] + nodeconfig)
log.log({'info': 'Configured {0} ({1})'.format(nodename,
handler.devname)})
info['discostatus'] = 'discovered'
for i in pending_by_uuid.get(curruuid, []):
eventlet.spawn_n(_recheck_single_unknown_info, cfg, i)
util.spawn(_recheck_single_unknown_info(cfg, i))
try:
del pending_by_uuid[curruuid]
except KeyError:
@@ -1545,7 +1551,7 @@ async def _handle_nodelist_change(configmanager):
await _recheck_nodes((), configmanager)
if needaddhandled:
needaddhandled = False
nodeaddhandler = eventlet.spawn(_handle_nodelist_change, configmanager)
nodeaddhandler = util.spawn(_handle_nodelist_change(configmanager))
else:
nodeaddhandler = None
@@ -1579,7 +1585,7 @@ async def newnodes(added, deleting, renamed, configmanager):
rechecker = None
rechecktime = None
rechecklock = eventlet.semaphore.Semaphore()
rechecklock = asyncio.Lock()
async def _periodic_recheck(configmanager):
global rechecker
@@ -1599,31 +1605,35 @@ async def _periodic_recheck(configmanager):
configmanager)
def rescan():
async def rescan():
_map_unique_ids()
global scanner
if scanner:
return
else:
scanner = eventlet.spawn(blocking_scan)
remotescan()
print("begin")
scanner = util.spawn(blocking_scan())
print("bg")
await remotescan()
def remotescan():
async def remotescan():
mycfm = cfm.ConfigManager(None)
myname = collective.get_myname()
for remagent in get_subscriptions():
try:
affluent.renotify_me(remagent, mycfm, myname)
await affluent.renotify_me(remagent, mycfm, myname)
except Exception as e:
log.log({'error': 'Unexpected problem asking {} for discovery notifications'.format(remagent)})
def blocking_scan():
async def blocking_scan():
global scanner
slpscan = eventlet.spawn(slp.active_scan, safe_detected, slp)
ssdpscan = eventlet.spawn(ssdp.active_scan, safe_detected, ssdp)
slpscan.wait()
ssdpscan.wait()
slpscan = util.spawn(slp.active_scan(safe_detected, slp))
#ssdpscan = eventlet.spawn(ssdp.active_scan, safe_detected, ssdp)
print("beign slpscan")
await slpscan
print("end scan")
#ssdpscan.wait()
scanner = None
def start_detection():
@@ -1644,7 +1654,7 @@ def start_detection():
if rechecker is None:
rechecktime = util.monotonic_time() + 900
rechecker = util.spawn_after(900, _periodic_recheck, cfg)
eventlet.spawn_n(ssdp.snoop, safe_detected, None, ssdp, get_node_by_uuid_or_mac)
#eventlet.spawn(ssdp.snoop(safe_detected, None, ssdp, get_node_by_uuid_or_mac))
def stop_autosense():
for watcher in list(autosensors):
@@ -1652,10 +1662,10 @@ def stop_autosense():
autosensors.discard(watcher)
def start_autosense():
autosensors.add(eventlet.spawn(slp.snoop, safe_detected, slp))
autosensors.add(util.spawn(slp.snoop(safe_detected, slp)))
#autosensors.add(eventlet.spawn(mdns.snoop, safe_detected, mdns))
autosensors.add(eventlet.spawn(pxe.snoop, safe_detected, pxe, get_node_guess_by_uuid))
eventlet.spawn(remotescan)
#autosensors.add(eventlet.spawn(pxe.snoop, safe_detected, pxe, get_node_guess_by_uuid))
util.spawn(remotescan())
nodes_by_fprint = {}
@@ -1699,7 +1709,10 @@ def _map_unique_ids(nodes=None):
nodes_by_fprint[fprint] = node
if __name__ == '__main__':
async def main():
start_detection()
while True:
eventlet.sleep(30)
await asyncio.sleep(30)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

View File

@@ -15,24 +15,19 @@
import confluent.discovery.handlers.generic as generic
import confluent.exceptions as exc
import confluent.netutil as netutil
import eventlet.support.greendns
# Provide foundation for general IPMI device configuration
import pyghmi.exceptions as pygexc
ipmicommand = eventlet.import_patched('pyghmi.ipmi.command')
ipmicommand.session.select = eventlet.green.select
ipmicommand.session.threading = eventlet.green.threading
ipmicommand.session.socket.getaddrinfo = eventlet.support.greendns.getaddrinfo
getaddrinfo = eventlet.support.greendns.getaddrinfo
import aiohmi.exceptions as pygexc
import aiohmi.ipmi.command as ipmicommand
import socket
class NodeHandler(generic.NodeHandler):
DEFAULT_USER = 'USERID'
DEFAULT_PASS = 'PASSW0RD'
def _get_ipmicmd(self, user=None, password=None):
async def _get_ipmicmd(self, user=None, password=None):
priv = None
if user is None or password is None:
if self.trieddefault:
@@ -42,7 +37,7 @@ class NodeHandler(generic.NodeHandler):
user = self.DEFAULT_USER
if password is None:
password = self.DEFAULT_PASS
return ipmicommand.Command(self.ipaddr, user, password,
return await ipmicommand.create(self.ipaddr, user, password,
privlevel=priv, keepalive=False)
def __init__(self, info, configmanager):
@@ -56,7 +51,7 @@ class NodeHandler(generic.NodeHandler):
def config(self, nodename, reset=False):
self._bmcconfig(nodename, reset)
def _bmcconfig(self, nodename, reset=False, customconfig=None, vc=None):
async def _bmcconfig(self, nodename, reset=False, customconfig=None, vc=None):
# TODO(jjohnson2): set ip parameters, user/pass, alert cfg maybe
# In general, try to use https automation, to make it consistent
# between hypothetical secure path and today.
@@ -69,7 +64,7 @@ class NodeHandler(generic.NodeHandler):
passwd = creds.get(nodename, {}).get(
'secret.hardwaremanagementpassword', {}).get('value', None)
try:
ic = self._get_ipmicmd()
ic = await self._get_ipmicmd()
passwd = self.DEFAULT_PASS
except pygexc.IpmiException as pi:
havecustomcreds = False
@@ -82,14 +77,14 @@ class NodeHandler(generic.NodeHandler):
else:
passwd = self.DEFAULT_PASS
if havecustomcreds:
ic = self._get_ipmicmd(user, passwd)
ic = await self._get_ipmicmd(user, passwd)
else:
raise
if vc:
ic.register_key_handler(vc)
currusers = ic.get_users()
lanchan = ic.get_network_channel()
userdata = ic.xraw_command(netfn=6, command=0x44, data=(lanchan,
currusers = await ic.get_users()
lanchan = await ic.get_network_channel()
userdata = await ic.xraw_command(netfn=6, command=0x44, data=(lanchan,
1))
userdata = bytearray(userdata['data'])
maxusers = userdata[0] & 0b111111
@@ -114,7 +109,7 @@ class NodeHandler(generic.NodeHandler):
newuserslot = uid
if newpass != passwd: # don't mess with existing if no change
ic.set_user_password(newuserslot, password=newpass)
ic = self._get_ipmicmd(user, passwd)
ic = await self._get_ipmicmd(user, passwd)
if vc:
ic.register_key_handler(vc)
break
@@ -126,7 +121,7 @@ class NodeHandler(generic.NodeHandler):
ic.set_user_password(newuserslot, password=newpass)
ic.set_user_name(newuserslot, newuser)
if havecustomcreds:
ic = self._get_ipmicmd(user, passwd)
ic = await self._get_ipmicmd(user, passwd)
if vc:
ic.register_key_handler(vc)
#We are remote operating on the account we are
@@ -161,7 +156,7 @@ class NodeHandler(generic.NodeHandler):
'fe80::')):
newip = cd['hardwaremanagement.manager']['value']
newip = newip.split('/', 1)[0]
newipinfo = getaddrinfo(newip, 0)[0]
newipinfo = socket.getaddrinfo(newip, 0)[0]
# This getaddrinfo is repeated in get_nic_config, could be
# optimized, albeit with a more convoluted api..
newip = newipinfo[-1][0]

View File

@@ -14,9 +14,8 @@
import confluent.util as util
import errno
import eventlet
import socket
webclient = eventlet.import_patched('pyghmi.util.webclient')
import aiohmi.util.webclient as webclient
class NodeHandler(object):
https_supported = True
@@ -37,6 +36,7 @@ class NodeHandler(object):
self.relay_server = None
self.web_ip = None
self.web_port = None
self.https_cert = None
# if this is a remote registered component, prefer to use the agent forwarder
if info.get('forwarder_url', False):
self.relay_url = info['forwarder_url']
@@ -114,14 +114,13 @@ class NodeHandler(object):
elif self._certfailreason == 2:
return 'unreachable'
@property
def https_cert(self):
async def get_https_cert(self):
if self._fp:
return self._fp
ip, port = self.get_web_port_and_ip()
wc = webclient.SecureHTTPConnection(ip, verifycallback=self._savecert, port=port)
ip, port = await self.get_web_port_and_ip()
wc = webclient.WebConnection(ip, verifycallback=self._savecert, port=port)
try:
wc.connect()
await wc.request('GET', '/')
except IOError as ie:
if ie.errno == errno.ECONNREFUSED:
self._certfailreason = 1
@@ -134,16 +133,17 @@ class NodeHandler(object):
except Exception:
self._certfailreason = 2
return None
self.https_cert = self._fp
return self._fp
def get_web_port_and_ip(self):
async def get_web_port_and_ip(self):
if self.web_ip:
return self.web_ip, self.web_port
# get target ip and port, either direct or relay as applicable
if self.relay_url:
kv = util.TLSCertVerifier(self.configmanager, self.relay_server,
'pubkeys.tls_hardwaremanager').verify_cert
w = webclient.SecureHTTPConnection(self.relay_server, verifycallback=kv)
w = webclient.WebConnection(self.relay_server, verifycallback=kv)
relaycreds = self.configmanager.get_node_attributes(self.relay_server, 'secret.*', decrypt=True)
relaycreds = relaycreds.get(self.relay_server, {})
relayuser = relaycreds.get('secret.hardwaremanagementuser', {}).get('value', None)
@@ -151,8 +151,7 @@ class NodeHandler(object):
if not relayuser or not relaypass:
raise Exception('No credentials for {0}'.format(self.relay_server))
w.set_basic_credentials(relayuser, relaypass)
w.connect()
w.request('GET', self.relay_url)
await w.request('GET', self.relay_url)
r = w.getresponse()
rb = r.read()
if r.code != 302:

View File

@@ -73,7 +73,7 @@ class NodeHandler(bmchandler.NodeHandler):
if slot != 0:
self.info['enclosure.bay'] = slot
def probe(self):
async def probe(self):
if self.info.get('enclosure.bay', 0) == 0:
self.scan()
if self.info.get('enclosure.bay', 0) != 0:
@@ -85,8 +85,8 @@ class NodeHandler(bmchandler.NodeHandler):
try:
# we are a dense platform, but the SLP data did not give us slot
# attempt to probe using IPMI
ipmicmd = self._get_ipmicmd()
guiddata = ipmicmd.xraw_command(netfn=6, command=8)
ipmicmd = await self._get_ipmicmd()
guiddata = await ipmicmd.xraw_command(netfn=6, command=8)
self.info['uuid'] = pygutil.decode_wireformat_uuid(
guiddata['data']).lower()
ipmicmd.oem_init()

View File

@@ -12,21 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import confluent.discovery.handlers.generic as generic
import confluent.exceptions as exc
import confluent.netutil as netutil
import confluent.util as util
import eventlet
import eventlet.support.greendns
import json
try:
from urllib import urlencode
except ImportError:
from urllib.parse import urlencode
import socket
from urllib.parse import urlencode
getaddrinfo = eventlet.support.greendns.getaddrinfo
webclient = eventlet.import_patched('pyghmi.util.webclient')
import aiohmi.util.webclient as webclient
class NodeHandler(generic.NodeHandler):
devname = 'TSM'
@@ -45,9 +40,11 @@ class NodeHandler(generic.NodeHandler):
self.atdefault = True
super(NodeHandler, self).__init__(info, configmanager)
def scan(self):
c = webclient.SecureHTTPConnection(self.ipaddr, 443, verifycallback=self.validate_cert)
i = c.grab_json_response('/redfish/v1/')
async def scan(self):
await self.get_https_cert()
c = webclient.WebConnection(
self.ipaddr, 443, verifycallback=self.validate_cert)
i = await c.grab_json_response('/redfish/v1/')
uuid = i.get('UUID', None)
if uuid:
self.info['uuid'] = uuid.lower()
@@ -58,20 +55,21 @@ class NodeHandler(generic.NodeHandler):
fprint = util.get_fingerprint(self.https_cert)
return util.cert_matches(fprint, certificate)
def _get_wc(self):
async def _get_wc(self):
authdata = { # start by trying factory defaults
'username': self.DEFAULT_USER,
'password': self.DEFAULT_PASS,
}
wc = webclient.SecureHTTPConnection(self.ipaddr, 443, verifycallback=self.validate_cert)
await self.get_https_cert()
wc = webclient.WebConnection(self.ipaddr, 443, verifycallback=self.validate_cert)
wc.set_header('Content-Type', 'application/json')
authmode = 0
if not self.trieddefault:
rsp, status = wc.grab_json_response_with_status('/api/session', authdata)
rsp, status = await wc.grab_json_response_with_status('/api/session', authdata)
if status == 403:
wc.set_header('Content-Type', 'application/x-www-form-urlencoded')
authmode = 1
rsp, status = wc.grab_json_response_with_status('/api/session', urlencode(authdata))
rsp, status = await wc.grab_json_response_with_status('/api/session', urlencode(authdata))
else:
authmode = 2
if status > 400:
@@ -101,19 +99,19 @@ class NodeHandler(generic.NodeHandler):
rpasschange, method='PATCH')
if status >= 200 and status < 300:
authdata['password'] = self.targpass
eventlet.sleep(10)
await asyncio.sleep(10)
else:
if b'[web.lua] Error in RequestHandler, thread' in rsp:
rsp, status = wc.grab_json_response_with_status('/api/reset-pass', passchange)
rsp, status = await wc.grab_json_response_with_status('/api/reset-pass', passchange)
else:
raise Exception("Redfish may not have been ready yet" + repr(rsp))
else:
rsp, status = wc.grab_json_response_with_status('/api/reset-pass', urlencode(passchange))
rsp, status = await wc.grab_json_response_with_status('/api/reset-pass', urlencode(passchange))
authdata['password'] = self.targpass
if authmode == 2:
rsp, status = wc.grab_json_response_with_status('/api/session', authdata)
rsp, status = await wc.grab_json_response_with_status('/api/session', authdata)
else:
rsp, status = wc.grab_json_response_with_status('/api/session', urlencode(authdata))
rsp, status = await wc.grab_json_response_with_status('/api/session', urlencode(authdata))
self.csrftok = rsp['CSRFToken']
self.channel = rsp['channel']
self.curruser = self.DEFAULT_USER
@@ -129,10 +127,10 @@ class NodeHandler(generic.NodeHandler):
authdata['username'] = self.curruser
authdata['password'] = self.currpass
if authmode != 1:
rsp, status = wc.grab_json_response_with_status('/api/session', authdata)
rsp, status = await wc.grab_json_response_with_status('/api/session', authdata)
if authmode == 1 or status == 403:
wc.set_header('Content-Type', 'application/x-www-form-urlencoded')
rsp, status = wc.grab_json_response_with_status('/api/session', urlencode(authdata))
rsp, status = await wc.grab_json_response_with_status('/api/session', urlencode(authdata))
if status != 200:
return None
self.csrftok = rsp['CSRFToken']
@@ -141,10 +139,10 @@ class NodeHandler(generic.NodeHandler):
authdata['username'] = self.targuser
authdata['password'] = self.targpass
if authmode != 1:
rsp, status = wc.grab_json_response_with_status('/api/session', authdata)
rsp, status = await wc.grab_json_response_with_status('/api/session', authdata)
if authmode == 1 or status == 403:
wc.set_header('Content-Type', 'application/x-www-form-urlencoded')
rsp, status = wc.grab_json_response_with_status('/api/session', urlencode(authdata))
rsp, status = await wc.grab_json_response_with_status('/api/session', urlencode(authdata))
if status != 200:
return None
self.curruser = self.targuser
@@ -153,7 +151,7 @@ class NodeHandler(generic.NodeHandler):
self.channel = rsp['channel']
return wc
def config(self, nodename):
async def config(self, nodename):
self.nodename = nodename
creds = self.configmanager.get_node_attributes(
nodename, ['secret.hardwaremanagementuser',
@@ -167,7 +165,7 @@ class NodeHandler(generic.NodeHandler):
passwd = util.stringify(passwd)
self.targuser = user
self.targpass = passwd
wc = self._get_wc()
wc = await self._get_wc()
wc.set_header('X-CSRFTOKEN', self.csrftok)
curruserinfo = {}
authupdate = False
@@ -202,7 +200,7 @@ class NodeHandler(generic.NodeHandler):
'fe80::')):
newip = cd['hardwaremanagement.manager']['value']
newip = newip.split('/', 1)[0]
newipinfo = getaddrinfo(newip, 0)[0]
newipinfo = socket.getaddrinfo(newip, 0)[0]
newip = newipinfo[-1][0]
if ':' in newip:
raise exc.NotImplementedException('IPv6 remote config TODO')
@@ -239,7 +237,7 @@ def remote_nodecfg(nodename, cfm):
ipaddr = cfg.get(nodename, {}).get('hardwaremanagement.manager', {}).get(
'value', None)
ipaddr = ipaddr.split('/', 1)[0]
ipaddr = getaddrinfo(ipaddr, 0)[0][-1]
ipaddr = socket.getaddrinfo(ipaddr, 0)[0][-1]
if not ipaddr:
raise Exception('Cannot remote configure a system without known '
'address')
@@ -254,4 +252,4 @@ if __name__ == '__main__':
info = {'addresses': [[sys.argv[1]]] }
print(repr(info))
testr = NodeHandler(info, c)
testr.config(sys.argv[2])
testr.config(sys.argv[2])

View File

@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import base64
import codecs
import confluent.discovery.handlers.imm as immhandler
@@ -19,15 +20,12 @@ import confluent.exceptions as exc
import confluent.netutil as netutil
import confluent.util as util
import errno
import eventlet
import eventlet.support.greendns
import json
import os
import pyghmi.exceptions as pygexc
import eventlet.green.socket as socket
webclient = eventlet.import_patched('pyghmi.util.webclient')
import aiohmi.exceptions as pygexc
import socket
import aiohmi.util.webclient as webclient
import struct
getaddrinfo = eventlet.support.greendns.getaddrinfo
def fixuuid(baduuid):
@@ -39,7 +37,8 @@ def fixuuid(baduuid):
uuid = (a[:8], a[8:12], a[12:16], baduuid[19:23], baduuid[24:])
return '-'.join(uuid).lower()
class LockedUserException(Exception):
class LockedUserException(BaseException):
pass
@@ -88,8 +87,8 @@ class NodeHandler(immhandler.NodeHandler):
def probe(self):
return None
def scan(self):
ip, port = self.get_web_port_and_ip()
async def scan(self):
ip, port = await self.get_web_port_and_ip()
c = webclient.SecureHTTPConnection(ip, port,
verifycallback=self.validate_cert)
i = c.grab_json_response('/api/providers/logoninfo')
@@ -131,7 +130,7 @@ class NodeHandler(immhandler.NodeHandler):
if slot != 0:
self.info['enclosure.bay'] = slot
def preconfig(self, possiblenode):
async def preconfig(self, possiblenode):
self.tmpnodename = possiblenode
ff = self.info.get('attributes', {}).get('enclosure-form-factor', '')
if ff not in ('dense-computing', [u'dense-computing']):
@@ -152,7 +151,7 @@ class NodeHandler(immhandler.NodeHandler):
disableipmi = False
if currfirm >= 3:
# IPMI is disabled and we need it, also we need to go to *some* password
wc = self.wc
wc = await self.get_wc()
if not wc:
# We cannot try to enable SMM here without risking real credentials
# on the wire to untrusted parties
@@ -195,7 +194,7 @@ class NodeHandler(immhandler.NodeHandler):
fprint = util.get_fingerprint(self.https_cert)
return util.cert_matches(fprint, certificate)
def get_webclient(self, username, password, newpassword):
async def get_webclient(self, username, password, newpassword):
wc = self._wc.dupe()
try:
wc.connect()
@@ -293,7 +292,7 @@ class NodeHandler(immhandler.NodeHandler):
if pwdchanged:
# Remove the minimum change interval, to allow sane
# password changes after provisional changes
wc = self.wc
wc = await self.get_wc()
self.set_password_policy('', wc)
return (wc, pwdchanged)
elif rspdata.get('locktime', 0) > 0:
@@ -301,8 +300,7 @@ class NodeHandler(immhandler.NodeHandler):
'The user "{0}" has been locked out by too many incorrect password attempts'.format(username))
return (None, rspdata)
@property
def wc(self):
async def get_wc(self):
passwd = None
isdefault = True
errinfo = {}
@@ -319,7 +317,7 @@ class NodeHandler(immhandler.NodeHandler):
nodename = None
inpreconfig = True
if self._currcreds[0] is not None:
wc, pwdchanged = self.get_webclient(self._currcreds[0], self._currcreds[1], None)
wc, pwdchanged = await self.get_webclient(self._currcreds[0], self._currcreds[1], None)
if wc:
return wc
if nodename:
@@ -342,7 +340,7 @@ class NodeHandler(immhandler.NodeHandler):
# (TempW0rd42)
passwd = 'TempW0rd42'
try:
wc, pwdchanged = self.get_webclient('USERID', 'PASSW0RD', passwd)
wc, pwdchanged = await self.get_webclient('USERID', 'PASSW0RD', passwd)
except LockedUserException as lue:
wc = None
pwdchanged = 'The user "USERID" has been locked out by too many incorrect password attempts'
@@ -363,11 +361,11 @@ class NodeHandler(immhandler.NodeHandler):
if self.tmppasswd:
if savedexc:
raise savedexc
wc, errinfo = self.get_webclient('USERID', self.tmppasswd, passwd)
wc, errinfo = await self.get_webclient('USERID', self.tmppasswd, passwd)
else:
if user == 'USERID' and savedexc:
raise savedexc
wc, errinfo = self.get_webclient(user, passwd, None)
wc, errinfo = await self.get_webclient(user, passwd, None)
if wc:
return wc
else:
@@ -408,7 +406,7 @@ class NodeHandler(immhandler.NodeHandler):
if user['users_user_name'] == '':
return user['users_user_id']
def _setup_xcc_account(self, username, passwd, wc):
async def _setup_xcc_account(self, username, passwd, wc):
userinfo = wc.grab_json_response('/api/dataset/imm_users')
uid = None
for user in userinfo['items'][0]['users']:
@@ -449,7 +447,7 @@ class NodeHandler(immhandler.NodeHandler):
if status != 200:
rsp = json.loads(rsp)
if rsp.get('error', {}).get('code', 'Unknown') in ('Base.1.8.GeneralError', 'Base.1.12.GeneralError', 'Base.1.14.GeneralError'):
eventlet.sleep(4)
await asyncio.sleep(4)
else:
break
self.tmppasswd = None
@@ -459,7 +457,7 @@ class NodeHandler(immhandler.NodeHandler):
wc.grab_json_response('/api/providers/logout')
self._currcreds = (username, passwd)
def _convert_sha256account(self, user, passwd, wc):
async def _convert_sha256account(self, user, passwd, wc):
# First check if the specified user is sha256...
userinfo = wc.grab_json_response('/api/dataset/imm_users')
curruser = None
@@ -472,7 +470,7 @@ class NodeHandler(immhandler.NodeHandler):
break
if curruser.get('users_pass_is_sha256', 0):
self._wc = None
wc = self.wc
wc = await self.get_wc()
nwc = wc.dupe()
# Have to convert it for being useful with most Lenovo automation tools
# This requires deleting the account entirely and trying again
@@ -511,7 +509,7 @@ class NodeHandler(immhandler.NodeHandler):
userparams = "{0},{1},{2},1,4,0,0,0,0,,8,".format(curruser['users_user_id'], user, tpass)
nwc.grab_json_response('/api/function', {'USER_UserCreate': userparams})
nwc.grab_json_response('/api/providers/logout')
nwc, pwdchanged = self.get_webclient(user, tpass, passwd)
nwc, pwdchanged = await self.get_webclient(user, tpass, passwd)
if not nwc:
if not pwdchanged:
pwdchanged = 'Unknown'
@@ -523,11 +521,11 @@ class NodeHandler(immhandler.NodeHandler):
nwc.grab_json_response('/api/providers/logout')
finally:
self._wc = None
wc = self.wc
wc = await self.get_wc()
wc.grab_json_response('/api/function', {'USER_UserDelete': "{0},{1}".format(tmpuid, '6pmu0ezczzcp')})
wc.grab_json_response('/api/providers/logout')
def config(self, nodename, reset=False):
async def config(self, nodename, reset=False):
self.nodename = nodename
cd = self.configmanager.get_node_attributes(
nodename, ['secret.hardwaremanagementuser',
@@ -546,7 +544,7 @@ class NodeHandler(immhandler.NodeHandler):
nodename, 'discovery.passwordrules')
strruleset = dpp.get(nodename, {}).get(
'discovery.passwordrules', {}).get('value', '')
wc = self.wc
wc = await self.get_wc()
creds = self.configmanager.get_node_attributes(
self.nodename, ['secret.hardwaremanagementuser',
'secret.hardwaremanagementpassword'], decrypt=True)
@@ -557,9 +555,9 @@ class NodeHandler(immhandler.NodeHandler):
raise Exception(
'Request to use default credentials, but refused by target after it has been changed to {0}'.format(self.tmppasswd))
if not isdefault:
self._setup_xcc_account(user, passwd, wc)
wc = self.wc
self._convert_sha256account(user, passwd, wc)
await self._setup_xcc_account(user, passwd, wc)
wc = await self.get_wc()
await self._convert_sha256account(user, passwd, wc)
if (cd.get('hardwaremanagement.method', {}).get('value', 'ipmi') != 'redfish'
or cd.get('console.method', {}).get('value', None) == 'ipmi'):
nwc = wc.dupe()
@@ -587,7 +585,7 @@ class NodeHandler(immhandler.NodeHandler):
updateinf, method='PATCH')
if targbmc and not targbmc.startswith('fe80::'):
newip = targbmc.split('/', 1)[0]
newipinfo = getaddrinfo(newip, 0)[0]
newipinfo = socket.getaddrinfo(newip, 0)[0]
newip = newipinfo[-1][0]
if ':' in newip:
raise exc.NotImplementedException('IPv6 remote config TODO')
@@ -607,7 +605,7 @@ class NodeHandler(immhandler.NodeHandler):
raise exc.InvalidArgumentException('Will not remotely configure a device with no gateway')
wc.grab_json_response('/api/dataset', statargs)
elif self.ipaddr.startswith('fe80::'):
self.configmanager.set_node_attributes(
await self.configmanager.set_node_attributes(
{nodename: {'hardwaremanagement.manager': self.ipaddr}})
else:
raise exc.TargetEndpointUnreachable(
@@ -625,7 +623,7 @@ class NodeHandler(immhandler.NodeHandler):
'value', None)
# ok, set the uuid of the manager...
if em:
self.configmanager.set_node_attributes(
await self.configmanager.set_node_attributes(
{em: {'id.uuid': enclosureuuid}})
def remote_nodecfg(nodename, cfm):
@@ -634,9 +632,9 @@ def remote_nodecfg(nodename, cfm):
ipaddr = cfg.get(nodename, {}).get('hardwaremanagement.manager', {}).get(
'value', None)
ipaddr = ipaddr.split('/', 1)[0]
ipaddr = getaddrinfo(ipaddr, 0)[0][-1]
ipaddr = socket.getaddrinfo(ipaddr, 0)[0][-1]
if not ipaddr:
raise Excecption('Cannot remote configure a system without known '
raise Exception('Cannot remote configure a system without known '
'address')
info = {'addresses': [ipaddr]}
nh = NodeHandler(info, cfm)

View File

@@ -14,14 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import confluent.neighutil as neighutil
import confluent.util as util
import confluent.log as log
import os
import random
import eventlet.greenpool
import eventlet.green.select as select
import eventlet.green.socket as socket
#import eventlet.green.socket as socket
import socket
import struct
import traceback
@@ -116,6 +116,7 @@ def _parse_slp_packet(packet, peer, rsps, xidmap, defer=None, sock=None):
else:
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
sock.setblocking(1)
sock.sendto(b'\x00', probepeer)
except Exception:
return
@@ -253,20 +254,51 @@ def _find_srvtype(net, net4, srvtype, addresses, xid):
pass
def _grab_rsps(socks, rsps, interval, xidmap, deferrals):
r = None
res = select.select(socks, (), (), interval)
if res:
r = res[0]
while r:
for s in r:
(rsp, peer) = s.recvfrom(9000)
_parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, s)
res = select.select(socks, (), (), interval)
if not res:
r = None
else:
r = res[0]
import time
def sock_read(fut, sock, cloop, allsocks):
if fut.done():
print("was already done???")
return
if not cloop.remove_reader(sock):
print("Was already removed??")
fut.set_result(sock)
allsocks.discard(sock)
async def _bulk_recvfrom(socks, timeout):
allsocks = set([])
cloop = asyncio.get_running_loop()
done = True
while done:
currfutures = []
for sock in socks:
sock.setblocking(0)
currfut = asyncio.Future()
cloop.add_reader(sock, sock_read, currfut, sock, cloop, allsocks)
currfutures.append(currfut)
done, dumbfutures = await asyncio.wait(currfutures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
for sk in allsocks:
cloop.remove_reader(sk)
for dumbfuture in dumbfutures:
dumbfuture.cancel()
socks = []
for currfut in done:
socks.append(await currfut)
for sock in socks:
sock.setblocking(0)
try:
yield (sock,) + sock.recvfrom(9000)
except socket.error:
print("shouldn't happen...")
continue
print("done recv")
async def _grab_rsps(socks, rsps, interval, xidmap, deferrals):
async for srp in _bulk_recvfrom(socks, interval):
sock, rsp, peer = srp
_parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, sock)
@@ -335,16 +367,17 @@ def _parse_attrs(data, parsed, xid=None):
parsed['attributes'] = _parse_attrlist(attrstr)
def fix_info(info, handler):
async def fix_info(info, handler):
if '_attempts' not in info:
info['_attempts'] = 10
if info['_attempts'] == 0:
return
info['_attempts'] -= 1
_add_attributes(info)
await _add_attributes(info)
handler(info)
def _add_attributes(parsed):
async def _add_attributes(parsed):
xid = parsed.get('xid', 42)
attrq = _generate_attr_request(parsed['services'][0], xid)
target = None
@@ -360,14 +393,16 @@ def _add_attributes(parsed):
net = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
else:
net = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cloop = asyncio.get_running_loop()
try:
net.settimeout(2.0)
net.connect(target)
except socket.error:
net.settimeout(0)
net.setblocking(0)
await asyncio.wait_for(cloop.sock_connect(net, target), 2.0)
except (socket.error, asyncio.exceptions.TimeoutError) as te:
return
try:
net.sendall(attrq)
rsp = net.recv(8192)
await cloop.sock_sendall(net, attrq)
rsp = await cloop.sock_recv(net, 8192)
net.close()
_parse_attrs(rsp, parsed, xid)
except Exception as e:
@@ -417,9 +452,9 @@ def query_srvtypes(target):
stypes = payload[4:4+stypelen].decode('utf-8')
return stypes.split(',')
def rescan(handler):
async def rescan(handler):
known_peers = set([])
for scanned in scan():
async for scanned in scan():
for addr in scanned['addresses']:
if addr in known_peers:
break
@@ -431,7 +466,7 @@ def rescan(handler):
handler(scanned)
def snoop(handler, protocol=None):
async def snoop(handler, protocol=None):
"""Watch for SLP activity
handler will be called with a dictionary of relevant attributes
@@ -441,10 +476,10 @@ def snoop(handler, protocol=None):
"""
tracelog = log.Logger('trace')
try:
active_scan(handler, protocol)
await active_scan(handler, protocol)
except Exception as e:
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
net = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
net.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
slpg = socket.inet_pton(socket.AF_INET6, 'ff01::123')
@@ -475,7 +510,6 @@ def snoop(handler, protocol=None):
while True:
try:
newmacs = set([])
r, _, _ = select.select((net, net4), (), (), 60)
# clear known_peers and peerbymacaddress
# to avoid stale info getting in...
# rely upon the select(0.2) to catch rapid fire and aggregate ip
@@ -485,29 +519,34 @@ def snoop(handler, protocol=None):
known_peers = set([])
peerbymacaddress = {}
deferpeers = []
while r and len(deferpeers) < 256:
for s in r:
(rsp, peer) = s.recvfrom(9000)
timeo = 60
rdy = True
while rdy and len(deferpeers) < 256:
rdy = False
async for srp in _bulk_recvfrom((net, net4), timeo):
rdy = True
s, rsp, peer = srp
if peer in known_peers:
continue
mac = neighutil.get_hwaddr(peer[0])
if not mac:
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
s.setblocking(1)
s.sendto(b'\x00', probepeer)
except Exception:
continue
deferpeers.append(peer)
continue
process_peer(newmacs, known_peers, peerbymacaddress, peer)
r, _, _ = select.select((net, net4), (), (), 0.2)
timeo = 0.2
if deferpeers:
eventlet.sleep(2.2)
await asyncio.sleep(2.2)
for peer in deferpeers:
process_peer(newmacs, known_peers, peerbymacaddress, peer)
for mac in newmacs:
peerbymacaddress[mac]['xid'] = 1
_add_attributes(peerbymacaddress[mac])
await _add_attributes(peerbymacaddress[mac])
peerbymacaddress[mac]['hwaddr'] = mac
peerbymacaddress[mac]['protocol'] = protocol
for srvurl in peerbymacaddress[mac].get('urls', ()):
@@ -515,6 +554,7 @@ def snoop(handler, protocol=None):
srvurl = srvurl[:-3]
if srvurl.endswith('://Athena:'):
continue
print(repr(peerbymacaddress[mac]))
if 'service:ipmi' in peerbymacaddress[mac]['services']:
continue
if 'service:lightttpd' in peerbymacaddress[mac]['services']:
@@ -560,13 +600,14 @@ def process_peer(newmacs, known_peers, peerbymacaddress, peer):
newmacs.add(mac)
def active_scan(handler, protocol=None):
async def active_scan(handler, protocol=None):
known_peers = set([])
toprocess = []
# Implement a warmup, inducing neighbor table activity
# by kernel and giving 2 seconds for a retry or two if
# needed
for scanned in scan():
async for scanned in scan():
print('fun with: ' + repr(scanned['services']))
for addr in scanned['addresses']:
if addr in known_peers:
break
@@ -581,7 +622,7 @@ def active_scan(handler, protocol=None):
handler(scanned)
def scan(srvtypes=_slp_services, addresses=None, localonly=False):
async def scan(srvtypes=_slp_services, addresses=None, localonly=False):
"""Find targets providing matching requested srvtypes
This is a generator that will iterate over respondants to the SrvType
@@ -613,23 +654,26 @@ def scan(srvtypes=_slp_services, addresses=None, localonly=False):
# processed, mitigating volume of response traffic
rsps = {}
deferrals = []
print('commence')
for srvtype in srvtypes:
xididx += 1
_find_srvtype(net, net4, srvtype, addresses, initxid + xididx)
xidmap[initxid + xididx] = srvtype
_grab_rsps((net, net4), rsps, 0.1, xidmap, deferrals)
await _grab_rsps((net, net4), rsps, 0.1, xidmap, deferrals)
# now do a more slow check to work to get stragglers,
# but fortunately the above should have taken the brunt of volume, so
# reduced chance of many responses overwhelming receive buffer.
_grab_rsps((net, net4), rsps, 1, xidmap, deferrals)
print('waity')
await _grab_rsps((net, net4), rsps, 1, xidmap, deferrals)
print(len(rsps))
if deferrals:
eventlet.sleep(1.2) # already have a one second pause from select above
await asyncio.sleep(1.2) # already have a one second pause from select above
for defer in deferrals:
rsp, peer = defer
_parse_slp_packet(rsp, peer, rsps, xidmap)
# now to analyze and flesh out the responses
handleids = set([])
gp = eventlet.greenpool.GreenPool(128)
tsks = []
for id in rsps:
for srvurl in rsps[id].get('urls', ()):
if len(srvurl) > 4:
@@ -644,9 +688,10 @@ def scan(srvtypes=_slp_services, addresses=None, localonly=False):
break
else:
continue
gp.spawn_n(_add_attributes, rsps[id])
tsks.append(util.spawn(_add_attributes(rsps[id])))
handleids.add(id)
gp.waitall()
if tsks:
await asyncio.wait(tsks)
for id in handleids:
if 'service:lighttpd' in rsps[id]['services']:
currinf = rsps[id]

View File

@@ -42,12 +42,12 @@ if __name__ == '__main__':
import confluent.config.configmanager as cfm
import confluent.snmputil as snmp
import asyncio
from confluent.networking.lldp import _handle_neighbor_query, get_fingerprint
from confluent.networking.netutil import get_switchcreds, list_switches, get_portnamemap
import eventlet.green.select as select
import eventlet.green.socket as socket
import socket
import confluent.collective.manager as collective
import confluent.exceptions as exc
@@ -55,7 +55,6 @@ import confluent.log as log
import confluent.messages as msg
import confluent.noderange as noderange
import confluent.util as util
from eventlet.greenpool import GreenPool
import eventlet.green.subprocess as subprocess
import fcntl
import eventlet
@@ -63,7 +62,7 @@ import eventlet.semaphore
import msgpack
import random
import re
webclient = eventlet.import_patched('pyghmi.util.webclient')
import aiohmi.util.webclient as webclient
noaffluent = set([])
@@ -124,9 +123,9 @@ def _namesmatch(switchdesc, userdesc):
return True
return False
def _map_switch(args):
async def _map_switch(args):
try:
return _map_switch_backend(args)
return await _map_switch_backend(args)
except (UnicodeError, socket.gaierror):
log.log({'error': "Cannot resolve switch '{0}' to an address".format(
args[0])})
@@ -152,11 +151,11 @@ def _nodelookup(switch, ifname):
return None
def _affluent_map_switch(args):
async def _affluent_map_switch(args):
switch, password, user, cfgm = args
kv = util.TLSCertVerifier(cfgm, switch,
'pubkeys.tls_hardwaremanager').verify_cert
wc = webclient.SecureHTTPConnection(
wc = webclient.WebConnection(
switch, 443, verifycallback=kv, timeout=5)
wc.set_basic_credentials(user, password)
macs, retcode = wc.grab_json_response_with_status('/affluent/macs/by-port')
@@ -241,7 +240,7 @@ def _recv_offload():
eventlet.sleep(0)
def _map_switch_backend(args):
async def _map_switch_backend(args):
"""Manipulate portions of mac address map relevant to a given switch
"""
@@ -267,7 +266,7 @@ def _map_switch_backend(args):
user = None
if switch not in noaffluent:
try:
return _affluent_map_switch(args)
return await _affluent_map_switch(args)
except exc.PubkeyInvalid:
log.log({'error': 'While trying to gather ethernet mac addresses '
'from {0}, the TLS certificate failed validation. '
@@ -433,13 +432,13 @@ def _snmp_map_switch(switch, password, user):
switchbackoff = 30
def find_nodeinfo_by_mac(mac, configmanager):
async def find_nodeinfo_by_mac(mac, configmanager):
now = util.monotonic_time()
if vintage and (now - vintage) < 90 and mac in _nodesbymac:
return _nodesbymac[mac][0], {'maccount': _nodesbymac[mac][1]}
# do not actually sweep switches more than once every 30 seconds
# however, if there is an update in progress, wait on it
for _ in update_macmap(configmanager,
async for _ in update_macmap(configmanager,
vintage and (now - vintage) < switchbackoff):
if mac in _nodesbymac:
return _nodesbymac[mac][0], {'maccount': _nodesbymac[mac][1]}
@@ -449,10 +448,10 @@ def find_nodeinfo_by_mac(mac, configmanager):
return None, {'maccount': 0}
mapupdating = eventlet.semaphore.Semaphore()
mapupdating = asyncio.Lock()
def update_macmap(configmanager, impatient=False):
async def update_macmap(configmanager, impatient=False):
"""Interrogate switches to build/update mac table
Begin a rebuild process. This process is a generator that will yield
@@ -462,13 +461,13 @@ def update_macmap(configmanager, impatient=False):
"""
if mapupdating.locked():
while mapupdating.locked():
eventlet.sleep(1)
await asyncio.sleep(1)
yield None
return
if impatient:
return
completions = _full_updatemacmap(configmanager)
for completion in completions:
async for completion in completions:
try:
yield completion
except GeneratorExit:
@@ -483,7 +482,7 @@ def _finish_update(completions):
pass
def _full_updatemacmap(configmanager):
async def _full_updatemacmap(configmanager):
global vintage
global _apimacmap
global _macmap
@@ -492,7 +491,7 @@ def _full_updatemacmap(configmanager):
global _macsbyswitch
global switchbackoff
start = util.monotonic_time()
with mapupdating:
async with mapupdating:
vintage = util.monotonic_time()
# Clear all existing entries
_macmap = {}
@@ -554,10 +553,12 @@ def _full_updatemacmap(configmanager):
if switch not in switches:
del _macsbyswitch[switch]
switchauth = get_switchcreds(configmanager, switches)
pool = GreenPool(64)
for ans in pool.imap(_map_switch, switchauth):
vintage = util.monotonic_time()
yield ans
#pool = GreenPool(64)
tsks = []
for sa in switchauth:
tsks.append(_map_switch(sa))
for tsk in asyncio.as_completed(tsks):
yield await tsk
_apimacmap = _macmap
endtime = util.monotonic_time()
duration = endtime - start
@@ -574,7 +575,7 @@ def _dump_locations(info, macaddr, nodename=None):
portinfo = []
for location in info:
portinfo.append({'switch': location[0],
'port': location[1], 'macsonport': location[2]})
'port': location[1], 'macsonport': location[2]})
retdata['ports'] = sorted(portinfo, key=lambda x: x['macsonport'],
reverse=True)
yield msg.KeyValueData(retdata)
@@ -587,7 +588,7 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents):
pathcomponents == ['networking', 'macs', 'rescan']):
if inputdata != {'rescan': 'start'}:
raise exc.InvalidArgumentException('Input must be rescan=start')
eventlet.spawn_n(rescan, configmanager)
util.spawn(rescan(configmanager))
return [msg.KeyValueData({'rescan': 'started'})]
raise exc.NotImplementedException(
'Operation {0} on {1} not implemented'.format(
@@ -701,8 +702,8 @@ def dump_macinfo(macaddr):
return _dump_locations(info, macaddr, _nodesbymac.get(macaddr, (None,))[0])
def rescan(cfg):
for _ in update_macmap(cfg):
async def rescan(cfg):
async for _ in update_macmap(cfg):
pass

View File

@@ -55,7 +55,7 @@ class WebClient(object):
return rsp
def renotify_me(node, configmanager, myname):
async def renotify_me(node, configmanager, myname):
creds = configmanager.get_node_attributes(
node, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
wc = WebClient(node, configmanager, creds)