diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index b6409a6d..8f9953ff 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -61,6 +61,7 @@ # retry until uppercase, lowercase, digit, and symbol all present) # - Apply defined configuration to endpoint +import asyncio import base64 import confluent.config.configmanager as cfm import confluent.collective.manager as collective @@ -80,20 +81,21 @@ import confluent.messages as msg import confluent.networking.macmap as macmap import confluent.noderange as noderange import confluent.util as util +import inspect import json import eventlet import traceback import shlex import struct -import eventlet.green.socket as socket +#import eventlet.green.socket as socket +import socket import socket as nsocket -import eventlet.green.subprocess as subprocess -webclient = eventlet.import_patched('pyghmi.util.webclient') +import subprocess +#import eventlet.green.subprocess as subprocess +#webclient = eventlet.import_patched('pyghmi.util.webclient') import eventlet -import eventlet.greenpool -import eventlet.semaphore autosensors = set() scanner = None @@ -153,7 +155,6 @@ servicebyname = { 'lenovo-tsm': 'service:lenovo-tsm', } -discopool = eventlet.greenpool.GreenPool(500) runningevals = {} # Passive-only auto-detection protocols: # PXE @@ -512,8 +513,8 @@ def save_subscriptions(subs): dso.write(json.dumps(subs)) -def register_remote_addrs(addresses, configmanager): - def register_remote_addr(addr): +async def register_remote_addrs(addresses, configmanager): + async def register_remote_addr(addr): nd = { 'addresses': [(addr, 443)] } @@ -522,12 +523,13 @@ def register_remote_addrs(addresses, configmanager): return addr, False sd['hwaddr'] = sd['attributes']['mac-address'] nh = xcc.NodeHandler(sd, configmanager) - nh.scan() - detected(nh.info) + await nh.scan() + await detected(nh.info) return addr, True - rpool = eventlet.greenpool.GreenPool(512) + #rpool = eventlet.greenpool.GreenPool(512) for count in iterate_addrs(addresses, True): yield msg.ConfluentResourceCount(count) + return # ASYNC for result in rpool.imap(register_remote_addr, iterate_addrs(addresses)): if result[1]: yield msg.CreatedResource(result[0]) @@ -535,7 +537,7 @@ def register_remote_addrs(addresses, configmanager): yield msg.ConfluentResourceNotFound(result[0]) -def handle_api_request(configmanager, inputdata, operation, pathcomponents): +async def handle_api_request(configmanager, inputdata, operation, pathcomponents): if pathcomponents == ['discovery', 'autosense']: return handle_autosense_config(operation, inputdata) if operation == 'retrieve' and pathcomponents[:2] == ['discovery', 'subscriptions']: @@ -549,7 +551,7 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents): pathcomponents == ['discovery', 'rescan']): if inputdata != {'rescan': 'start'}: raise exc.InvalidArgumentException() - rescan() + await rescan() return (msg.KeyValueData({'rescan': 'started'}),) elif operation in ('update', 'create') and pathcomponents[:2] == ['discovery', 'subscriptions']: target = pathcomponents[2] @@ -570,7 +572,7 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents): if pathcomponents == ['discovery', 'register']: if 'addresses' not in inputdata: raise exc.InvalidArgumentException('Missing address in input') - return register_remote_addrs(inputdata['addresses'], configmanager) + return await register_remote_addrs(inputdata['addresses'], configmanager) if 'node' not in inputdata: raise exc.InvalidArgumentException('Missing node name in input') mac = _get_mac_from_query(pathcomponents) @@ -581,7 +583,7 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents): '/'.join(pathcomponents))) handler = info['handler'].NodeHandler(info, configmanager) try: - eval_node(configmanager, handler, info, inputdata['node'], + await eval_node(configmanager, handler, info, inputdata['node'], manual=True) except Exception as e: # or... incorrect passworod provided.. @@ -654,10 +656,10 @@ async def _recheck_nodes(nodeattribs, configmanager): # if already in progress, don't run again # it may make sense to schedule a repeat, but will try the easier and less redundant way first return - with rechecklock: - return _recheck_nodes_backend(nodeattribs, configmanager) + async with rechecklock: + return await _recheck_nodes_backend(nodeattribs, configmanager) -def _recheck_nodes_backend(nodeattribs, configmanager): +async def _recheck_nodes_backend(nodeattribs, configmanager): global rechecker _map_unique_ids(nodeattribs) # for the nodes whose attributes have changed, consider them as potential @@ -673,7 +675,7 @@ def _recheck_nodes_backend(nodeattribs, configmanager): # Now we go through ones we did not find earlier for mac in list(unknown_info): try: - _recheck_single_unknown(configmanager, mac) + await _recheck_single_unknown(configmanager, mac) except Exception: traceback.print_exc() continue @@ -685,19 +687,19 @@ def _recheck_nodes_backend(nodeattribs, configmanager): if info['handler'] is None: next handler = info['handler'].NodeHandler(info, configmanager) - discopool.spawn_n(eval_node, configmanager, handler, info, nodename) + util.spawn(eval_node(configmanager, handler, info, nodename)) except Exception: traceback.print_exc() log.log({'error': 'Unexpected error during discovery of {0}, check debug ' 'logs'.format(nodename)}) -def _recheck_single_unknown(configmanager, mac): +async def _recheck_single_unknown(configmanager, mac): info = unknown_info.get(mac, None) - _recheck_single_unknown_info(configmanager, info) + await _recheck_single_unknown_info(configmanager, info) -def _recheck_single_unknown_info(configmanager, info): +async def _recheck_single_unknown_info(configmanager, info): global rechecker global rechecktime if not info or info['handler'] is None: @@ -728,12 +730,12 @@ def _recheck_single_unknown_info(configmanager, info): if rechecker is not None and rechecktime > util.monotonic_time() + 300: rechecker.cancel() # if cancel did not result in dead, then we are in progress - if rechecker is None or rechecker.dead: + if rechecker is None or rechecker.done(): rechecktime = util.monotonic_time() + 300 rechecker = util.spawn_after(300, _periodic_recheck, configmanager) return - nodename, info['maccount'] = get_nodename(configmanager, handler, info) + nodename, info['maccount'] = await get_nodename(configmanager, handler, info) if nodename: if handler.https_supported: dp = configmanager.get_node_attributes([nodename], @@ -745,27 +747,30 @@ def _recheck_single_unknown_info(configmanager, info): known_nodes[nodename][info['hwaddr']] = info info['discostatus'] = 'discovered' return # already known, no need for more - discopool.spawn_n(eval_node, configmanager, handler, info, nodename) + util.spawn(eval_node(configmanager, handler, info, nodename)) def safe_detected(info): + print(repr(info['services'])) if 'hwaddr' not in info or not info['hwaddr']: + print("No mac!!!") return if info['hwaddr'] in runningevals: + print("Already mac!!!") # Do not evaluate the same mac multiple times at once return - runningevals[info['hwaddr']] = discopool.spawn(eval_detected, info) + runningevals[info['hwaddr']] = util.spawn(eval_detected(info)) -def eval_detected(info): +async def eval_detected(info): try: - detected(info) + await detected(info) except Exception as e: traceback.print_exc() del runningevals[info['hwaddr']] -def detected(info): +async def detected(info): global rechecker global rechecktime if not cfm.config_is_ready(): @@ -783,7 +788,7 @@ def detected(info): return if (handler and not handler.NodeHandler.adequate(info) and info.get('protocol', None)): - eventlet.spawn_after(10, info['protocol'].fix_info, info, + util.spawn_after(10, info['protocol'].fix_info, info, safe_detected) return if info['hwaddr'] in known_info and 'addresses' in info: @@ -816,7 +821,9 @@ def detected(info): cfg = cfm.ConfigManager(None) if handler: handler = handler.NodeHandler(info, cfg) - handler.scan() + res = handler.scan() + if inspect.isawaitable(res): + await res try: if 'modelnumber' not in info: info['modelnumber'] = info['attributes']['enclosure-machinetype-model'][0] @@ -858,7 +865,7 @@ def detected(info): )}) if rechecker is not None and rechecktime > util.monotonic_time() + 300: rechecker.cancel() - if rechecker is None or rechecker.dead: + if rechecker is None or rechecker.done(): rechecktime = util.monotonic_time() + 300 rechecker = util.spawn_after(300, _periodic_recheck, cfg) unknown_info[info['hwaddr']] = info @@ -866,7 +873,7 @@ def detected(info): #TODO, eventlet spawn after to recheck sooner, or somehow else # influence periodic recheck to shorten delay? return - nodename, info['maccount'] = get_nodename(cfg, handler, info) + nodename, info['maccount'] = await get_nodename(cfg, handler, info) if nodename and handler and handler.https_supported: dp = cfg.get_node_attributes([nodename], ('pubkeys.tls_hardwaremanager', 'id.uuid', 'discovery.policy')) @@ -893,7 +900,7 @@ def detected(info): #for now defer probe until inside eval_node. We might not have #a nodename without probe in the future. if nodename and handler: - eval_node(cfg, handler, info, nodename) + await eval_node(cfg, handler, info, nodename) elif handler: #log.log( # {'info': 'Detected unknown {0} with hwaddr {1} at ' @@ -1038,7 +1045,7 @@ def get_nodename_sysdisco(cfg, handler, info): return nl[0] -def get_nodename(cfg, handler, info): +async def get_nodename(cfg, handler, info): nodename = None maccount = None info['verified'] = False @@ -1080,7 +1087,7 @@ def get_nodename(cfg, handler, info): if not nodename: # as a last resort, search switches for info # This is the slowest potential operation, so we hope for the # best to occur prior to this - nodename, macinfo = macmap.find_nodeinfo_by_mac(info['hwaddr'], cfg) + nodename, macinfo = await macmap.find_nodeinfo_by_mac(info['hwaddr'], cfg) maccount = macinfo['maccount'] if nodename: if handler.devname == 'SMM': @@ -1198,7 +1205,7 @@ def search_smms_by_cert(currsmm, cert, cfg): return search_smms_by_cert(exnl[0], cert, cfg) -def eval_node(cfg, handler, info, nodename, manual=False): +async def eval_node(cfg, handler, info, nodename, manual=False): try: handler.probe() # unicast interrogation as possible to get more data # switch concurrently @@ -1237,7 +1244,7 @@ def eval_node(cfg, handler, info, nodename, manual=False): info['verfied'] = True info['enclosure.bay'] = match[1] if match[2]: - if not discover_node(cfg, handler, info, match[2], manual): + if not await discover_node(cfg, handler, info, match[2], manual): pending_nodes[match[2]] = info return if 'enclosure.bay' not in info: @@ -1304,7 +1311,7 @@ def eval_node(cfg, handler, info, nodename, manual=False): info['discostatus'] = 'unidentified' return nodename = nl[0] - if not discover_node(cfg, handler, info, nodename, manual): + if not await discover_node(cfg, handler, info, nodename, manual): # store it as pending, assuming blocked on enclosure # assurance... pending_nodes[nodename] = info @@ -1324,7 +1331,7 @@ def eval_node(cfg, handler, info, nodename, manual=False): fprints = macmap.get_node_fingerprints(nodename, cfg) for fprint in fprints: if util.cert_matches(fprint[0], handler.https_cert): - if not discover_node(cfg, handler, info, + if not await discover_node(cfg, handler, info, nodename, manual): pending_nodes[nodename] = info return @@ -1336,11 +1343,11 @@ def eval_node(cfg, handler, info, nodename, manual=False): 'switch.'.format(nodename, handler.devname) log.log({'error': errorstr}) return - if not discover_node(cfg, handler, info, nodename, manual): + if not await discover_node(cfg, handler, info, nodename, manual): pending_nodes[nodename] = info -def discover_node(cfg, handler, info, nodename, manual): +async def discover_node(cfg, handler, info, nodename, manual): if manual: if not cfg.is_node(nodename): raise exc.InvalidArgumentException( @@ -1449,15 +1456,14 @@ def discover_node(cfg, handler, info, nodename, manual): log.log({'error': 'Unable to get BMC address for {0]'.format(nodename)}) else: bmcaddr = bmcaddr.split('/', 1)[0] - wait_for_connection(bmcaddr) - socket.getaddrinfo(bmcaddr, 443) + await wait_for_connection(bmcaddr) subprocess.check_call(['/opt/confluent/bin/nodeconfig', nodename] + nodeconfig) log.log({'info': 'Configured {0} ({1})'.format(nodename, handler.devname)}) info['discostatus'] = 'discovered' for i in pending_by_uuid.get(curruuid, []): - eventlet.spawn_n(_recheck_single_unknown_info, cfg, i) + util.spawn(_recheck_single_unknown_info(cfg, i)) try: del pending_by_uuid[curruuid] except KeyError: @@ -1545,7 +1551,7 @@ async def _handle_nodelist_change(configmanager): await _recheck_nodes((), configmanager) if needaddhandled: needaddhandled = False - nodeaddhandler = eventlet.spawn(_handle_nodelist_change, configmanager) + nodeaddhandler = util.spawn(_handle_nodelist_change(configmanager)) else: nodeaddhandler = None @@ -1579,7 +1585,7 @@ async def newnodes(added, deleting, renamed, configmanager): rechecker = None rechecktime = None -rechecklock = eventlet.semaphore.Semaphore() +rechecklock = asyncio.Lock() async def _periodic_recheck(configmanager): global rechecker @@ -1599,31 +1605,35 @@ async def _periodic_recheck(configmanager): configmanager) -def rescan(): +async def rescan(): _map_unique_ids() global scanner if scanner: return else: - scanner = eventlet.spawn(blocking_scan) - remotescan() + print("begin") + scanner = util.spawn(blocking_scan()) + print("bg") + await remotescan() -def remotescan(): +async def remotescan(): mycfm = cfm.ConfigManager(None) myname = collective.get_myname() for remagent in get_subscriptions(): try: - affluent.renotify_me(remagent, mycfm, myname) + await affluent.renotify_me(remagent, mycfm, myname) except Exception as e: log.log({'error': 'Unexpected problem asking {} for discovery notifications'.format(remagent)}) -def blocking_scan(): +async def blocking_scan(): global scanner - slpscan = eventlet.spawn(slp.active_scan, safe_detected, slp) - ssdpscan = eventlet.spawn(ssdp.active_scan, safe_detected, ssdp) - slpscan.wait() - ssdpscan.wait() + slpscan = util.spawn(slp.active_scan(safe_detected, slp)) + #ssdpscan = eventlet.spawn(ssdp.active_scan, safe_detected, ssdp) + print("beign slpscan") + await slpscan + print("end scan") + #ssdpscan.wait() scanner = None def start_detection(): @@ -1644,7 +1654,7 @@ def start_detection(): if rechecker is None: rechecktime = util.monotonic_time() + 900 rechecker = util.spawn_after(900, _periodic_recheck, cfg) - eventlet.spawn_n(ssdp.snoop, safe_detected, None, ssdp, get_node_by_uuid_or_mac) + #eventlet.spawn(ssdp.snoop(safe_detected, None, ssdp, get_node_by_uuid_or_mac)) def stop_autosense(): for watcher in list(autosensors): @@ -1652,10 +1662,10 @@ def stop_autosense(): autosensors.discard(watcher) def start_autosense(): - autosensors.add(eventlet.spawn(slp.snoop, safe_detected, slp)) + autosensors.add(util.spawn(slp.snoop(safe_detected, slp))) #autosensors.add(eventlet.spawn(mdns.snoop, safe_detected, mdns)) - autosensors.add(eventlet.spawn(pxe.snoop, safe_detected, pxe, get_node_guess_by_uuid)) - eventlet.spawn(remotescan) + #autosensors.add(eventlet.spawn(pxe.snoop, safe_detected, pxe, get_node_guess_by_uuid)) + util.spawn(remotescan()) nodes_by_fprint = {} @@ -1699,7 +1709,10 @@ def _map_unique_ids(nodes=None): nodes_by_fprint[fprint] = node -if __name__ == '__main__': +async def main(): start_detection() while True: - eventlet.sleep(30) + await asyncio.sleep(30) + +if __name__ == '__main__': + asyncio.get_event_loop().run_until_complete(main()) diff --git a/confluent_server/confluent/discovery/handlers/bmc.py b/confluent_server/confluent/discovery/handlers/bmc.py index a9ed315a..c3a2b6bd 100644 --- a/confluent_server/confluent/discovery/handlers/bmc.py +++ b/confluent_server/confluent/discovery/handlers/bmc.py @@ -15,24 +15,19 @@ import confluent.discovery.handlers.generic as generic import confluent.exceptions as exc import confluent.netutil as netutil -import eventlet.support.greendns # Provide foundation for general IPMI device configuration -import pyghmi.exceptions as pygexc -ipmicommand = eventlet.import_patched('pyghmi.ipmi.command') -ipmicommand.session.select = eventlet.green.select -ipmicommand.session.threading = eventlet.green.threading -ipmicommand.session.socket.getaddrinfo = eventlet.support.greendns.getaddrinfo -getaddrinfo = eventlet.support.greendns.getaddrinfo - +import aiohmi.exceptions as pygexc +import aiohmi.ipmi.command as ipmicommand +import socket class NodeHandler(generic.NodeHandler): DEFAULT_USER = 'USERID' DEFAULT_PASS = 'PASSW0RD' - def _get_ipmicmd(self, user=None, password=None): + async def _get_ipmicmd(self, user=None, password=None): priv = None if user is None or password is None: if self.trieddefault: @@ -42,7 +37,7 @@ class NodeHandler(generic.NodeHandler): user = self.DEFAULT_USER if password is None: password = self.DEFAULT_PASS - return ipmicommand.Command(self.ipaddr, user, password, + return await ipmicommand.create(self.ipaddr, user, password, privlevel=priv, keepalive=False) def __init__(self, info, configmanager): @@ -56,7 +51,7 @@ class NodeHandler(generic.NodeHandler): def config(self, nodename, reset=False): self._bmcconfig(nodename, reset) - def _bmcconfig(self, nodename, reset=False, customconfig=None, vc=None): + async def _bmcconfig(self, nodename, reset=False, customconfig=None, vc=None): # TODO(jjohnson2): set ip parameters, user/pass, alert cfg maybe # In general, try to use https automation, to make it consistent # between hypothetical secure path and today. @@ -69,7 +64,7 @@ class NodeHandler(generic.NodeHandler): passwd = creds.get(nodename, {}).get( 'secret.hardwaremanagementpassword', {}).get('value', None) try: - ic = self._get_ipmicmd() + ic = await self._get_ipmicmd() passwd = self.DEFAULT_PASS except pygexc.IpmiException as pi: havecustomcreds = False @@ -82,14 +77,14 @@ class NodeHandler(generic.NodeHandler): else: passwd = self.DEFAULT_PASS if havecustomcreds: - ic = self._get_ipmicmd(user, passwd) + ic = await self._get_ipmicmd(user, passwd) else: raise if vc: ic.register_key_handler(vc) - currusers = ic.get_users() - lanchan = ic.get_network_channel() - userdata = ic.xraw_command(netfn=6, command=0x44, data=(lanchan, + currusers = await ic.get_users() + lanchan = await ic.get_network_channel() + userdata = await ic.xraw_command(netfn=6, command=0x44, data=(lanchan, 1)) userdata = bytearray(userdata['data']) maxusers = userdata[0] & 0b111111 @@ -114,7 +109,7 @@ class NodeHandler(generic.NodeHandler): newuserslot = uid if newpass != passwd: # don't mess with existing if no change ic.set_user_password(newuserslot, password=newpass) - ic = self._get_ipmicmd(user, passwd) + ic = await self._get_ipmicmd(user, passwd) if vc: ic.register_key_handler(vc) break @@ -126,7 +121,7 @@ class NodeHandler(generic.NodeHandler): ic.set_user_password(newuserslot, password=newpass) ic.set_user_name(newuserslot, newuser) if havecustomcreds: - ic = self._get_ipmicmd(user, passwd) + ic = await self._get_ipmicmd(user, passwd) if vc: ic.register_key_handler(vc) #We are remote operating on the account we are @@ -161,7 +156,7 @@ class NodeHandler(generic.NodeHandler): 'fe80::')): newip = cd['hardwaremanagement.manager']['value'] newip = newip.split('/', 1)[0] - newipinfo = getaddrinfo(newip, 0)[0] + newipinfo = socket.getaddrinfo(newip, 0)[0] # This getaddrinfo is repeated in get_nic_config, could be # optimized, albeit with a more convoluted api.. newip = newipinfo[-1][0] diff --git a/confluent_server/confluent/discovery/handlers/generic.py b/confluent_server/confluent/discovery/handlers/generic.py index 2e941238..e18f944d 100644 --- a/confluent_server/confluent/discovery/handlers/generic.py +++ b/confluent_server/confluent/discovery/handlers/generic.py @@ -14,9 +14,8 @@ import confluent.util as util import errno -import eventlet import socket -webclient = eventlet.import_patched('pyghmi.util.webclient') +import aiohmi.util.webclient as webclient class NodeHandler(object): https_supported = True @@ -37,6 +36,7 @@ class NodeHandler(object): self.relay_server = None self.web_ip = None self.web_port = None + self.https_cert = None # if this is a remote registered component, prefer to use the agent forwarder if info.get('forwarder_url', False): self.relay_url = info['forwarder_url'] @@ -114,14 +114,13 @@ class NodeHandler(object): elif self._certfailreason == 2: return 'unreachable' - @property - def https_cert(self): + async def get_https_cert(self): if self._fp: return self._fp - ip, port = self.get_web_port_and_ip() - wc = webclient.SecureHTTPConnection(ip, verifycallback=self._savecert, port=port) + ip, port = await self.get_web_port_and_ip() + wc = webclient.WebConnection(ip, verifycallback=self._savecert, port=port) try: - wc.connect() + await wc.request('GET', '/') except IOError as ie: if ie.errno == errno.ECONNREFUSED: self._certfailreason = 1 @@ -134,16 +133,17 @@ class NodeHandler(object): except Exception: self._certfailreason = 2 return None + self.https_cert = self._fp return self._fp - def get_web_port_and_ip(self): + async def get_web_port_and_ip(self): if self.web_ip: return self.web_ip, self.web_port # get target ip and port, either direct or relay as applicable if self.relay_url: kv = util.TLSCertVerifier(self.configmanager, self.relay_server, 'pubkeys.tls_hardwaremanager').verify_cert - w = webclient.SecureHTTPConnection(self.relay_server, verifycallback=kv) + w = webclient.WebConnection(self.relay_server, verifycallback=kv) relaycreds = self.configmanager.get_node_attributes(self.relay_server, 'secret.*', decrypt=True) relaycreds = relaycreds.get(self.relay_server, {}) relayuser = relaycreds.get('secret.hardwaremanagementuser', {}).get('value', None) @@ -151,8 +151,7 @@ class NodeHandler(object): if not relayuser or not relaypass: raise Exception('No credentials for {0}'.format(self.relay_server)) w.set_basic_credentials(relayuser, relaypass) - w.connect() - w.request('GET', self.relay_url) + await w.request('GET', self.relay_url) r = w.getresponse() rb = r.read() if r.code != 302: diff --git a/confluent_server/confluent/discovery/handlers/imm.py b/confluent_server/confluent/discovery/handlers/imm.py index 35765a3f..7e0ef7d5 100644 --- a/confluent_server/confluent/discovery/handlers/imm.py +++ b/confluent_server/confluent/discovery/handlers/imm.py @@ -73,7 +73,7 @@ class NodeHandler(bmchandler.NodeHandler): if slot != 0: self.info['enclosure.bay'] = slot - def probe(self): + async def probe(self): if self.info.get('enclosure.bay', 0) == 0: self.scan() if self.info.get('enclosure.bay', 0) != 0: @@ -85,8 +85,8 @@ class NodeHandler(bmchandler.NodeHandler): try: # we are a dense platform, but the SLP data did not give us slot # attempt to probe using IPMI - ipmicmd = self._get_ipmicmd() - guiddata = ipmicmd.xraw_command(netfn=6, command=8) + ipmicmd = await self._get_ipmicmd() + guiddata = await ipmicmd.xraw_command(netfn=6, command=8) self.info['uuid'] = pygutil.decode_wireformat_uuid( guiddata['data']).lower() ipmicmd.oem_init() diff --git a/confluent_server/confluent/discovery/handlers/tsm.py b/confluent_server/confluent/discovery/handlers/tsm.py index d5fde8c1..478360a1 100644 --- a/confluent_server/confluent/discovery/handlers/tsm.py +++ b/confluent_server/confluent/discovery/handlers/tsm.py @@ -12,21 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import confluent.discovery.handlers.generic as generic import confluent.exceptions as exc import confluent.netutil as netutil import confluent.util as util -import eventlet -import eventlet.support.greendns -import json -try: - from urllib import urlencode -except ImportError: - from urllib.parse import urlencode +import socket +from urllib.parse import urlencode -getaddrinfo = eventlet.support.greendns.getaddrinfo -webclient = eventlet.import_patched('pyghmi.util.webclient') +import aiohmi.util.webclient as webclient class NodeHandler(generic.NodeHandler): devname = 'TSM' @@ -45,9 +40,11 @@ class NodeHandler(generic.NodeHandler): self.atdefault = True super(NodeHandler, self).__init__(info, configmanager) - def scan(self): - c = webclient.SecureHTTPConnection(self.ipaddr, 443, verifycallback=self.validate_cert) - i = c.grab_json_response('/redfish/v1/') + async def scan(self): + await self.get_https_cert() + c = webclient.WebConnection( + self.ipaddr, 443, verifycallback=self.validate_cert) + i = await c.grab_json_response('/redfish/v1/') uuid = i.get('UUID', None) if uuid: self.info['uuid'] = uuid.lower() @@ -58,20 +55,21 @@ class NodeHandler(generic.NodeHandler): fprint = util.get_fingerprint(self.https_cert) return util.cert_matches(fprint, certificate) - def _get_wc(self): + async def _get_wc(self): authdata = { # start by trying factory defaults 'username': self.DEFAULT_USER, 'password': self.DEFAULT_PASS, } - wc = webclient.SecureHTTPConnection(self.ipaddr, 443, verifycallback=self.validate_cert) + await self.get_https_cert() + wc = webclient.WebConnection(self.ipaddr, 443, verifycallback=self.validate_cert) wc.set_header('Content-Type', 'application/json') authmode = 0 if not self.trieddefault: - rsp, status = wc.grab_json_response_with_status('/api/session', authdata) + rsp, status = await wc.grab_json_response_with_status('/api/session', authdata) if status == 403: wc.set_header('Content-Type', 'application/x-www-form-urlencoded') authmode = 1 - rsp, status = wc.grab_json_response_with_status('/api/session', urlencode(authdata)) + rsp, status = await wc.grab_json_response_with_status('/api/session', urlencode(authdata)) else: authmode = 2 if status > 400: @@ -101,19 +99,19 @@ class NodeHandler(generic.NodeHandler): rpasschange, method='PATCH') if status >= 200 and status < 300: authdata['password'] = self.targpass - eventlet.sleep(10) + await asyncio.sleep(10) else: if b'[web.lua] Error in RequestHandler, thread' in rsp: - rsp, status = wc.grab_json_response_with_status('/api/reset-pass', passchange) + rsp, status = await wc.grab_json_response_with_status('/api/reset-pass', passchange) else: raise Exception("Redfish may not have been ready yet" + repr(rsp)) else: - rsp, status = wc.grab_json_response_with_status('/api/reset-pass', urlencode(passchange)) + rsp, status = await wc.grab_json_response_with_status('/api/reset-pass', urlencode(passchange)) authdata['password'] = self.targpass if authmode == 2: - rsp, status = wc.grab_json_response_with_status('/api/session', authdata) + rsp, status = await wc.grab_json_response_with_status('/api/session', authdata) else: - rsp, status = wc.grab_json_response_with_status('/api/session', urlencode(authdata)) + rsp, status = await wc.grab_json_response_with_status('/api/session', urlencode(authdata)) self.csrftok = rsp['CSRFToken'] self.channel = rsp['channel'] self.curruser = self.DEFAULT_USER @@ -129,10 +127,10 @@ class NodeHandler(generic.NodeHandler): authdata['username'] = self.curruser authdata['password'] = self.currpass if authmode != 1: - rsp, status = wc.grab_json_response_with_status('/api/session', authdata) + rsp, status = await wc.grab_json_response_with_status('/api/session', authdata) if authmode == 1 or status == 403: wc.set_header('Content-Type', 'application/x-www-form-urlencoded') - rsp, status = wc.grab_json_response_with_status('/api/session', urlencode(authdata)) + rsp, status = await wc.grab_json_response_with_status('/api/session', urlencode(authdata)) if status != 200: return None self.csrftok = rsp['CSRFToken'] @@ -141,10 +139,10 @@ class NodeHandler(generic.NodeHandler): authdata['username'] = self.targuser authdata['password'] = self.targpass if authmode != 1: - rsp, status = wc.grab_json_response_with_status('/api/session', authdata) + rsp, status = await wc.grab_json_response_with_status('/api/session', authdata) if authmode == 1 or status == 403: wc.set_header('Content-Type', 'application/x-www-form-urlencoded') - rsp, status = wc.grab_json_response_with_status('/api/session', urlencode(authdata)) + rsp, status = await wc.grab_json_response_with_status('/api/session', urlencode(authdata)) if status != 200: return None self.curruser = self.targuser @@ -153,7 +151,7 @@ class NodeHandler(generic.NodeHandler): self.channel = rsp['channel'] return wc - def config(self, nodename): + async def config(self, nodename): self.nodename = nodename creds = self.configmanager.get_node_attributes( nodename, ['secret.hardwaremanagementuser', @@ -167,7 +165,7 @@ class NodeHandler(generic.NodeHandler): passwd = util.stringify(passwd) self.targuser = user self.targpass = passwd - wc = self._get_wc() + wc = await self._get_wc() wc.set_header('X-CSRFTOKEN', self.csrftok) curruserinfo = {} authupdate = False @@ -202,7 +200,7 @@ class NodeHandler(generic.NodeHandler): 'fe80::')): newip = cd['hardwaremanagement.manager']['value'] newip = newip.split('/', 1)[0] - newipinfo = getaddrinfo(newip, 0)[0] + newipinfo = socket.getaddrinfo(newip, 0)[0] newip = newipinfo[-1][0] if ':' in newip: raise exc.NotImplementedException('IPv6 remote config TODO') @@ -239,7 +237,7 @@ def remote_nodecfg(nodename, cfm): ipaddr = cfg.get(nodename, {}).get('hardwaremanagement.manager', {}).get( 'value', None) ipaddr = ipaddr.split('/', 1)[0] - ipaddr = getaddrinfo(ipaddr, 0)[0][-1] + ipaddr = socket.getaddrinfo(ipaddr, 0)[0][-1] if not ipaddr: raise Exception('Cannot remote configure a system without known ' 'address') @@ -254,4 +252,4 @@ if __name__ == '__main__': info = {'addresses': [[sys.argv[1]]] } print(repr(info)) testr = NodeHandler(info, c) - testr.config(sys.argv[2]) \ No newline at end of file + testr.config(sys.argv[2]) diff --git a/confluent_server/confluent/discovery/handlers/xcc.py b/confluent_server/confluent/discovery/handlers/xcc.py index ff7ca042..6ea41230 100644 --- a/confluent_server/confluent/discovery/handlers/xcc.py +++ b/confluent_server/confluent/discovery/handlers/xcc.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import base64 import codecs import confluent.discovery.handlers.imm as immhandler @@ -19,15 +20,12 @@ import confluent.exceptions as exc import confluent.netutil as netutil import confluent.util as util import errno -import eventlet -import eventlet.support.greendns import json import os -import pyghmi.exceptions as pygexc -import eventlet.green.socket as socket -webclient = eventlet.import_patched('pyghmi.util.webclient') +import aiohmi.exceptions as pygexc +import socket +import aiohmi.util.webclient as webclient import struct -getaddrinfo = eventlet.support.greendns.getaddrinfo def fixuuid(baduuid): @@ -39,7 +37,8 @@ def fixuuid(baduuid): uuid = (a[:8], a[8:12], a[12:16], baduuid[19:23], baduuid[24:]) return '-'.join(uuid).lower() -class LockedUserException(Exception): + +class LockedUserException(BaseException): pass @@ -88,8 +87,8 @@ class NodeHandler(immhandler.NodeHandler): def probe(self): return None - def scan(self): - ip, port = self.get_web_port_and_ip() + async def scan(self): + ip, port = await self.get_web_port_and_ip() c = webclient.SecureHTTPConnection(ip, port, verifycallback=self.validate_cert) i = c.grab_json_response('/api/providers/logoninfo') @@ -131,7 +130,7 @@ class NodeHandler(immhandler.NodeHandler): if slot != 0: self.info['enclosure.bay'] = slot - def preconfig(self, possiblenode): + async def preconfig(self, possiblenode): self.tmpnodename = possiblenode ff = self.info.get('attributes', {}).get('enclosure-form-factor', '') if ff not in ('dense-computing', [u'dense-computing']): @@ -152,7 +151,7 @@ class NodeHandler(immhandler.NodeHandler): disableipmi = False if currfirm >= 3: # IPMI is disabled and we need it, also we need to go to *some* password - wc = self.wc + wc = await self.get_wc() if not wc: # We cannot try to enable SMM here without risking real credentials # on the wire to untrusted parties @@ -195,7 +194,7 @@ class NodeHandler(immhandler.NodeHandler): fprint = util.get_fingerprint(self.https_cert) return util.cert_matches(fprint, certificate) - def get_webclient(self, username, password, newpassword): + async def get_webclient(self, username, password, newpassword): wc = self._wc.dupe() try: wc.connect() @@ -293,7 +292,7 @@ class NodeHandler(immhandler.NodeHandler): if pwdchanged: # Remove the minimum change interval, to allow sane # password changes after provisional changes - wc = self.wc + wc = await self.get_wc() self.set_password_policy('', wc) return (wc, pwdchanged) elif rspdata.get('locktime', 0) > 0: @@ -301,8 +300,7 @@ class NodeHandler(immhandler.NodeHandler): 'The user "{0}" has been locked out by too many incorrect password attempts'.format(username)) return (None, rspdata) - @property - def wc(self): + async def get_wc(self): passwd = None isdefault = True errinfo = {} @@ -319,7 +317,7 @@ class NodeHandler(immhandler.NodeHandler): nodename = None inpreconfig = True if self._currcreds[0] is not None: - wc, pwdchanged = self.get_webclient(self._currcreds[0], self._currcreds[1], None) + wc, pwdchanged = await self.get_webclient(self._currcreds[0], self._currcreds[1], None) if wc: return wc if nodename: @@ -342,7 +340,7 @@ class NodeHandler(immhandler.NodeHandler): # (TempW0rd42) passwd = 'TempW0rd42' try: - wc, pwdchanged = self.get_webclient('USERID', 'PASSW0RD', passwd) + wc, pwdchanged = await self.get_webclient('USERID', 'PASSW0RD', passwd) except LockedUserException as lue: wc = None pwdchanged = 'The user "USERID" has been locked out by too many incorrect password attempts' @@ -363,11 +361,11 @@ class NodeHandler(immhandler.NodeHandler): if self.tmppasswd: if savedexc: raise savedexc - wc, errinfo = self.get_webclient('USERID', self.tmppasswd, passwd) + wc, errinfo = await self.get_webclient('USERID', self.tmppasswd, passwd) else: if user == 'USERID' and savedexc: raise savedexc - wc, errinfo = self.get_webclient(user, passwd, None) + wc, errinfo = await self.get_webclient(user, passwd, None) if wc: return wc else: @@ -408,7 +406,7 @@ class NodeHandler(immhandler.NodeHandler): if user['users_user_name'] == '': return user['users_user_id'] - def _setup_xcc_account(self, username, passwd, wc): + async def _setup_xcc_account(self, username, passwd, wc): userinfo = wc.grab_json_response('/api/dataset/imm_users') uid = None for user in userinfo['items'][0]['users']: @@ -449,7 +447,7 @@ class NodeHandler(immhandler.NodeHandler): if status != 200: rsp = json.loads(rsp) if rsp.get('error', {}).get('code', 'Unknown') in ('Base.1.8.GeneralError', 'Base.1.12.GeneralError', 'Base.1.14.GeneralError'): - eventlet.sleep(4) + await asyncio.sleep(4) else: break self.tmppasswd = None @@ -459,7 +457,7 @@ class NodeHandler(immhandler.NodeHandler): wc.grab_json_response('/api/providers/logout') self._currcreds = (username, passwd) - def _convert_sha256account(self, user, passwd, wc): + async def _convert_sha256account(self, user, passwd, wc): # First check if the specified user is sha256... userinfo = wc.grab_json_response('/api/dataset/imm_users') curruser = None @@ -472,7 +470,7 @@ class NodeHandler(immhandler.NodeHandler): break if curruser.get('users_pass_is_sha256', 0): self._wc = None - wc = self.wc + wc = await self.get_wc() nwc = wc.dupe() # Have to convert it for being useful with most Lenovo automation tools # This requires deleting the account entirely and trying again @@ -511,7 +509,7 @@ class NodeHandler(immhandler.NodeHandler): userparams = "{0},{1},{2},1,4,0,0,0,0,,8,".format(curruser['users_user_id'], user, tpass) nwc.grab_json_response('/api/function', {'USER_UserCreate': userparams}) nwc.grab_json_response('/api/providers/logout') - nwc, pwdchanged = self.get_webclient(user, tpass, passwd) + nwc, pwdchanged = await self.get_webclient(user, tpass, passwd) if not nwc: if not pwdchanged: pwdchanged = 'Unknown' @@ -523,11 +521,11 @@ class NodeHandler(immhandler.NodeHandler): nwc.grab_json_response('/api/providers/logout') finally: self._wc = None - wc = self.wc + wc = await self.get_wc() wc.grab_json_response('/api/function', {'USER_UserDelete': "{0},{1}".format(tmpuid, '6pmu0ezczzcp')}) wc.grab_json_response('/api/providers/logout') - def config(self, nodename, reset=False): + async def config(self, nodename, reset=False): self.nodename = nodename cd = self.configmanager.get_node_attributes( nodename, ['secret.hardwaremanagementuser', @@ -546,7 +544,7 @@ class NodeHandler(immhandler.NodeHandler): nodename, 'discovery.passwordrules') strruleset = dpp.get(nodename, {}).get( 'discovery.passwordrules', {}).get('value', '') - wc = self.wc + wc = await self.get_wc() creds = self.configmanager.get_node_attributes( self.nodename, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) @@ -557,9 +555,9 @@ class NodeHandler(immhandler.NodeHandler): raise Exception( 'Request to use default credentials, but refused by target after it has been changed to {0}'.format(self.tmppasswd)) if not isdefault: - self._setup_xcc_account(user, passwd, wc) - wc = self.wc - self._convert_sha256account(user, passwd, wc) + await self._setup_xcc_account(user, passwd, wc) + wc = await self.get_wc() + await self._convert_sha256account(user, passwd, wc) if (cd.get('hardwaremanagement.method', {}).get('value', 'ipmi') != 'redfish' or cd.get('console.method', {}).get('value', None) == 'ipmi'): nwc = wc.dupe() @@ -587,7 +585,7 @@ class NodeHandler(immhandler.NodeHandler): updateinf, method='PATCH') if targbmc and not targbmc.startswith('fe80::'): newip = targbmc.split('/', 1)[0] - newipinfo = getaddrinfo(newip, 0)[0] + newipinfo = socket.getaddrinfo(newip, 0)[0] newip = newipinfo[-1][0] if ':' in newip: raise exc.NotImplementedException('IPv6 remote config TODO') @@ -607,7 +605,7 @@ class NodeHandler(immhandler.NodeHandler): raise exc.InvalidArgumentException('Will not remotely configure a device with no gateway') wc.grab_json_response('/api/dataset', statargs) elif self.ipaddr.startswith('fe80::'): - self.configmanager.set_node_attributes( + await self.configmanager.set_node_attributes( {nodename: {'hardwaremanagement.manager': self.ipaddr}}) else: raise exc.TargetEndpointUnreachable( @@ -625,7 +623,7 @@ class NodeHandler(immhandler.NodeHandler): 'value', None) # ok, set the uuid of the manager... if em: - self.configmanager.set_node_attributes( + await self.configmanager.set_node_attributes( {em: {'id.uuid': enclosureuuid}}) def remote_nodecfg(nodename, cfm): @@ -634,9 +632,9 @@ def remote_nodecfg(nodename, cfm): ipaddr = cfg.get(nodename, {}).get('hardwaremanagement.manager', {}).get( 'value', None) ipaddr = ipaddr.split('/', 1)[0] - ipaddr = getaddrinfo(ipaddr, 0)[0][-1] + ipaddr = socket.getaddrinfo(ipaddr, 0)[0][-1] if not ipaddr: - raise Excecption('Cannot remote configure a system without known ' + raise Exception('Cannot remote configure a system without known ' 'address') info = {'addresses': [ipaddr]} nh = NodeHandler(info, cfm) diff --git a/confluent_server/confluent/discovery/protocols/slp.py b/confluent_server/confluent/discovery/protocols/slp.py index ac332def..460dba17 100644 --- a/confluent_server/confluent/discovery/protocols/slp.py +++ b/confluent_server/confluent/discovery/protocols/slp.py @@ -14,14 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import confluent.neighutil as neighutil import confluent.util as util import confluent.log as log import os import random -import eventlet.greenpool -import eventlet.green.select as select -import eventlet.green.socket as socket +#import eventlet.green.socket as socket +import socket import struct import traceback @@ -116,6 +116,7 @@ def _parse_slp_packet(packet, peer, rsps, xidmap, defer=None, sock=None): else: probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] try: + sock.setblocking(1) sock.sendto(b'\x00', probepeer) except Exception: return @@ -253,20 +254,51 @@ def _find_srvtype(net, net4, srvtype, addresses, xid): pass -def _grab_rsps(socks, rsps, interval, xidmap, deferrals): - r = None - res = select.select(socks, (), (), interval) - if res: - r = res[0] - while r: - for s in r: - (rsp, peer) = s.recvfrom(9000) - _parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, s) - res = select.select(socks, (), (), interval) - if not res: - r = None - else: - r = res[0] +import time + +def sock_read(fut, sock, cloop, allsocks): + if fut.done(): + print("was already done???") + return + if not cloop.remove_reader(sock): + print("Was already removed??") + fut.set_result(sock) + allsocks.discard(sock) + +async def _bulk_recvfrom(socks, timeout): + allsocks = set([]) + cloop = asyncio.get_running_loop() + done = True + while done: + currfutures = [] + for sock in socks: + sock.setblocking(0) + currfut = asyncio.Future() + cloop.add_reader(sock, sock_read, currfut, sock, cloop, allsocks) + currfutures.append(currfut) + done, dumbfutures = await asyncio.wait(currfutures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED) + for sk in allsocks: + cloop.remove_reader(sk) + for dumbfuture in dumbfutures: + dumbfuture.cancel() + socks = [] + for currfut in done: + socks.append(await currfut) + for sock in socks: + sock.setblocking(0) + try: + yield (sock,) + sock.recvfrom(9000) + except socket.error: + print("shouldn't happen...") + continue + print("done recv") + + + +async def _grab_rsps(socks, rsps, interval, xidmap, deferrals): + async for srp in _bulk_recvfrom(socks, interval): + sock, rsp, peer = srp + _parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, sock) @@ -335,16 +367,17 @@ def _parse_attrs(data, parsed, xid=None): parsed['attributes'] = _parse_attrlist(attrstr) -def fix_info(info, handler): +async def fix_info(info, handler): if '_attempts' not in info: info['_attempts'] = 10 if info['_attempts'] == 0: return info['_attempts'] -= 1 - _add_attributes(info) + await _add_attributes(info) handler(info) -def _add_attributes(parsed): + +async def _add_attributes(parsed): xid = parsed.get('xid', 42) attrq = _generate_attr_request(parsed['services'][0], xid) target = None @@ -360,14 +393,16 @@ def _add_attributes(parsed): net = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) else: net = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + cloop = asyncio.get_running_loop() try: - net.settimeout(2.0) - net.connect(target) - except socket.error: + net.settimeout(0) + net.setblocking(0) + await asyncio.wait_for(cloop.sock_connect(net, target), 2.0) + except (socket.error, asyncio.exceptions.TimeoutError) as te: return try: - net.sendall(attrq) - rsp = net.recv(8192) + await cloop.sock_sendall(net, attrq) + rsp = await cloop.sock_recv(net, 8192) net.close() _parse_attrs(rsp, parsed, xid) except Exception as e: @@ -417,9 +452,9 @@ def query_srvtypes(target): stypes = payload[4:4+stypelen].decode('utf-8') return stypes.split(',') -def rescan(handler): +async def rescan(handler): known_peers = set([]) - for scanned in scan(): + async for scanned in scan(): for addr in scanned['addresses']: if addr in known_peers: break @@ -431,7 +466,7 @@ def rescan(handler): handler(scanned) -def snoop(handler, protocol=None): +async def snoop(handler, protocol=None): """Watch for SLP activity handler will be called with a dictionary of relevant attributes @@ -441,10 +476,10 @@ def snoop(handler, protocol=None): """ tracelog = log.Logger('trace') try: - active_scan(handler, protocol) + await active_scan(handler, protocol) except Exception as e: - tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, - event=log.Events.stacktrace) + tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, + event=log.Events.stacktrace) net = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) net.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) slpg = socket.inet_pton(socket.AF_INET6, 'ff01::123') @@ -475,7 +510,6 @@ def snoop(handler, protocol=None): while True: try: newmacs = set([]) - r, _, _ = select.select((net, net4), (), (), 60) # clear known_peers and peerbymacaddress # to avoid stale info getting in... # rely upon the select(0.2) to catch rapid fire and aggregate ip @@ -485,29 +519,34 @@ def snoop(handler, protocol=None): known_peers = set([]) peerbymacaddress = {} deferpeers = [] - while r and len(deferpeers) < 256: - for s in r: - (rsp, peer) = s.recvfrom(9000) + timeo = 60 + rdy = True + while rdy and len(deferpeers) < 256: + rdy = False + async for srp in _bulk_recvfrom((net, net4), timeo): + rdy = True + s, rsp, peer = srp if peer in known_peers: continue mac = neighutil.get_hwaddr(peer[0]) if not mac: probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] try: + s.setblocking(1) s.sendto(b'\x00', probepeer) except Exception: continue deferpeers.append(peer) continue process_peer(newmacs, known_peers, peerbymacaddress, peer) - r, _, _ = select.select((net, net4), (), (), 0.2) + timeo = 0.2 if deferpeers: - eventlet.sleep(2.2) + await asyncio.sleep(2.2) for peer in deferpeers: process_peer(newmacs, known_peers, peerbymacaddress, peer) for mac in newmacs: peerbymacaddress[mac]['xid'] = 1 - _add_attributes(peerbymacaddress[mac]) + await _add_attributes(peerbymacaddress[mac]) peerbymacaddress[mac]['hwaddr'] = mac peerbymacaddress[mac]['protocol'] = protocol for srvurl in peerbymacaddress[mac].get('urls', ()): @@ -515,6 +554,7 @@ def snoop(handler, protocol=None): srvurl = srvurl[:-3] if srvurl.endswith('://Athena:'): continue + print(repr(peerbymacaddress[mac])) if 'service:ipmi' in peerbymacaddress[mac]['services']: continue if 'service:lightttpd' in peerbymacaddress[mac]['services']: @@ -560,13 +600,14 @@ def process_peer(newmacs, known_peers, peerbymacaddress, peer): newmacs.add(mac) -def active_scan(handler, protocol=None): +async def active_scan(handler, protocol=None): known_peers = set([]) toprocess = [] # Implement a warmup, inducing neighbor table activity # by kernel and giving 2 seconds for a retry or two if # needed - for scanned in scan(): + async for scanned in scan(): + print('fun with: ' + repr(scanned['services'])) for addr in scanned['addresses']: if addr in known_peers: break @@ -581,7 +622,7 @@ def active_scan(handler, protocol=None): handler(scanned) -def scan(srvtypes=_slp_services, addresses=None, localonly=False): +async def scan(srvtypes=_slp_services, addresses=None, localonly=False): """Find targets providing matching requested srvtypes This is a generator that will iterate over respondants to the SrvType @@ -613,23 +654,26 @@ def scan(srvtypes=_slp_services, addresses=None, localonly=False): # processed, mitigating volume of response traffic rsps = {} deferrals = [] + print('commence') for srvtype in srvtypes: xididx += 1 _find_srvtype(net, net4, srvtype, addresses, initxid + xididx) xidmap[initxid + xididx] = srvtype - _grab_rsps((net, net4), rsps, 0.1, xidmap, deferrals) + await _grab_rsps((net, net4), rsps, 0.1, xidmap, deferrals) # now do a more slow check to work to get stragglers, # but fortunately the above should have taken the brunt of volume, so # reduced chance of many responses overwhelming receive buffer. - _grab_rsps((net, net4), rsps, 1, xidmap, deferrals) + print('waity') + await _grab_rsps((net, net4), rsps, 1, xidmap, deferrals) + print(len(rsps)) if deferrals: - eventlet.sleep(1.2) # already have a one second pause from select above + await asyncio.sleep(1.2) # already have a one second pause from select above for defer in deferrals: rsp, peer = defer _parse_slp_packet(rsp, peer, rsps, xidmap) # now to analyze and flesh out the responses handleids = set([]) - gp = eventlet.greenpool.GreenPool(128) + tsks = [] for id in rsps: for srvurl in rsps[id].get('urls', ()): if len(srvurl) > 4: @@ -644,9 +688,10 @@ def scan(srvtypes=_slp_services, addresses=None, localonly=False): break else: continue - gp.spawn_n(_add_attributes, rsps[id]) + tsks.append(util.spawn(_add_attributes(rsps[id]))) handleids.add(id) - gp.waitall() + if tsks: + await asyncio.wait(tsks) for id in handleids: if 'service:lighttpd' in rsps[id]['services']: currinf = rsps[id] diff --git a/confluent_server/confluent/networking/macmap.py b/confluent_server/confluent/networking/macmap.py index cf6012c5..c2f39e91 100644 --- a/confluent_server/confluent/networking/macmap.py +++ b/confluent_server/confluent/networking/macmap.py @@ -42,12 +42,12 @@ if __name__ == '__main__': import confluent.config.configmanager as cfm import confluent.snmputil as snmp - +import asyncio from confluent.networking.lldp import _handle_neighbor_query, get_fingerprint from confluent.networking.netutil import get_switchcreds, list_switches, get_portnamemap import eventlet.green.select as select -import eventlet.green.socket as socket +import socket import confluent.collective.manager as collective import confluent.exceptions as exc @@ -55,7 +55,6 @@ import confluent.log as log import confluent.messages as msg import confluent.noderange as noderange import confluent.util as util -from eventlet.greenpool import GreenPool import eventlet.green.subprocess as subprocess import fcntl import eventlet @@ -63,7 +62,7 @@ import eventlet.semaphore import msgpack import random import re -webclient = eventlet.import_patched('pyghmi.util.webclient') +import aiohmi.util.webclient as webclient noaffluent = set([]) @@ -124,9 +123,9 @@ def _namesmatch(switchdesc, userdesc): return True return False -def _map_switch(args): +async def _map_switch(args): try: - return _map_switch_backend(args) + return await _map_switch_backend(args) except (UnicodeError, socket.gaierror): log.log({'error': "Cannot resolve switch '{0}' to an address".format( args[0])}) @@ -152,11 +151,11 @@ def _nodelookup(switch, ifname): return None -def _affluent_map_switch(args): +async def _affluent_map_switch(args): switch, password, user, cfgm = args kv = util.TLSCertVerifier(cfgm, switch, 'pubkeys.tls_hardwaremanager').verify_cert - wc = webclient.SecureHTTPConnection( + wc = webclient.WebConnection( switch, 443, verifycallback=kv, timeout=5) wc.set_basic_credentials(user, password) macs, retcode = wc.grab_json_response_with_status('/affluent/macs/by-port') @@ -241,7 +240,7 @@ def _recv_offload(): eventlet.sleep(0) -def _map_switch_backend(args): +async def _map_switch_backend(args): """Manipulate portions of mac address map relevant to a given switch """ @@ -267,7 +266,7 @@ def _map_switch_backend(args): user = None if switch not in noaffluent: try: - return _affluent_map_switch(args) + return await _affluent_map_switch(args) except exc.PubkeyInvalid: log.log({'error': 'While trying to gather ethernet mac addresses ' 'from {0}, the TLS certificate failed validation. ' @@ -433,13 +432,13 @@ def _snmp_map_switch(switch, password, user): switchbackoff = 30 -def find_nodeinfo_by_mac(mac, configmanager): +async def find_nodeinfo_by_mac(mac, configmanager): now = util.monotonic_time() if vintage and (now - vintage) < 90 and mac in _nodesbymac: return _nodesbymac[mac][0], {'maccount': _nodesbymac[mac][1]} # do not actually sweep switches more than once every 30 seconds # however, if there is an update in progress, wait on it - for _ in update_macmap(configmanager, + async for _ in update_macmap(configmanager, vintage and (now - vintage) < switchbackoff): if mac in _nodesbymac: return _nodesbymac[mac][0], {'maccount': _nodesbymac[mac][1]} @@ -449,10 +448,10 @@ def find_nodeinfo_by_mac(mac, configmanager): return None, {'maccount': 0} -mapupdating = eventlet.semaphore.Semaphore() +mapupdating = asyncio.Lock() -def update_macmap(configmanager, impatient=False): +async def update_macmap(configmanager, impatient=False): """Interrogate switches to build/update mac table Begin a rebuild process. This process is a generator that will yield @@ -462,13 +461,13 @@ def update_macmap(configmanager, impatient=False): """ if mapupdating.locked(): while mapupdating.locked(): - eventlet.sleep(1) + await asyncio.sleep(1) yield None return if impatient: return completions = _full_updatemacmap(configmanager) - for completion in completions: + async for completion in completions: try: yield completion except GeneratorExit: @@ -483,7 +482,7 @@ def _finish_update(completions): pass -def _full_updatemacmap(configmanager): +async def _full_updatemacmap(configmanager): global vintage global _apimacmap global _macmap @@ -492,7 +491,7 @@ def _full_updatemacmap(configmanager): global _macsbyswitch global switchbackoff start = util.monotonic_time() - with mapupdating: + async with mapupdating: vintage = util.monotonic_time() # Clear all existing entries _macmap = {} @@ -554,10 +553,12 @@ def _full_updatemacmap(configmanager): if switch not in switches: del _macsbyswitch[switch] switchauth = get_switchcreds(configmanager, switches) - pool = GreenPool(64) - for ans in pool.imap(_map_switch, switchauth): - vintage = util.monotonic_time() - yield ans + #pool = GreenPool(64) + tsks = [] + for sa in switchauth: + tsks.append(_map_switch(sa)) + for tsk in asyncio.as_completed(tsks): + yield await tsk _apimacmap = _macmap endtime = util.monotonic_time() duration = endtime - start @@ -574,7 +575,7 @@ def _dump_locations(info, macaddr, nodename=None): portinfo = [] for location in info: portinfo.append({'switch': location[0], - 'port': location[1], 'macsonport': location[2]}) + 'port': location[1], 'macsonport': location[2]}) retdata['ports'] = sorted(portinfo, key=lambda x: x['macsonport'], reverse=True) yield msg.KeyValueData(retdata) @@ -587,7 +588,7 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents): pathcomponents == ['networking', 'macs', 'rescan']): if inputdata != {'rescan': 'start'}: raise exc.InvalidArgumentException('Input must be rescan=start') - eventlet.spawn_n(rescan, configmanager) + util.spawn(rescan(configmanager)) return [msg.KeyValueData({'rescan': 'started'})] raise exc.NotImplementedException( 'Operation {0} on {1} not implemented'.format( @@ -701,8 +702,8 @@ def dump_macinfo(macaddr): return _dump_locations(info, macaddr, _nodesbymac.get(macaddr, (None,))[0]) -def rescan(cfg): - for _ in update_macmap(cfg): +async def rescan(cfg): + async for _ in update_macmap(cfg): pass diff --git a/confluent_server/confluent/plugins/hardwaremanagement/affluent.py b/confluent_server/confluent/plugins/hardwaremanagement/affluent.py index ea169b3b..a65e8a6b 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/affluent.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/affluent.py @@ -55,7 +55,7 @@ class WebClient(object): return rsp -def renotify_me(node, configmanager, myname): +async def renotify_me(node, configmanager, myname): creds = configmanager.get_node_attributes( node, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True) wc = WebClient(node, configmanager, creds)