From d42e8e0921abaa84f4122c3cf1eecaa56e699380 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 6 Mar 2024 16:50:34 -0500 Subject: [PATCH] Further asyncio port of confluent Advance state of basic clients to advance testing and soon start doing deeper activity. --- confluent_client/bin/nodeattrib | 231 ++++++++++++++------------- confluent_client/bin/nodediscover | 53 +++--- confluent_client/bin/nodelist | 2 +- confluent_client/bin/noderemove | 51 +++--- confluent_client/confluent/client.py | 33 ++-- confluent_server/confluent/core.py | 1 + 6 files changed, 194 insertions(+), 177 deletions(-) diff --git a/confluent_client/bin/nodeattrib b/confluent_client/bin/nodeattrib index f4b0331f..ab1cbcff 100755 --- a/confluent_client/bin/nodeattrib +++ b/confluent_client/bin/nodeattrib @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2017 Lenovo @@ -17,6 +17,7 @@ __author__ = 'alin37' +import asyncio from getpass import getpass import optparse import os @@ -36,123 +37,127 @@ if path.startswith('/opt'): import confluent.client as client -argparser = optparse.OptionParser( - usage='''\n %prog [-b] noderange [list of attributes or 'all'] \ - \n %prog -c noderange \ - \n %prog -e noderange \ - \n %prog noderange attribute1=value1 attribute2=value,... - \n ''') -argparser.add_option('-b', '--blame', action='store_true', - help='Show information about how attributes inherited') -argparser.add_option('-e', '--environment', action='store_true', - help='Set attributes, but from environment variable of ' - 'same name') -argparser.add_option('-c', '--clear', action='store_true', - help='Clear attributes') -argparser.add_option('-p', '--prompt', action='store_true', - help='Prompt for attribute values interactively') -argparser.add_option('-m', '--maxnodes', type='int', - help='Prompt if trying to set attributes on more ' - 'than specified number of nodes') -argparser.add_option('-s', '--set', dest='set', metavar='settings.batch', - default=False, help='set attributes using a batch file') -(options, args) = argparser.parse_args() +async def main(): + argparser = optparse.OptionParser( + usage='''\n %prog [-b] noderange [list of attributes or 'all'] \ + \n %prog -c noderange \ + \n %prog -e noderange \ + \n %prog noderange attribute1=value1 attribute2=value,... + \n ''') + argparser.add_option('-b', '--blame', action='store_true', + help='Show information about how attributes inherited') + argparser.add_option('-e', '--environment', action='store_true', + help='Set attributes, but from environment variable of ' + 'same name') + argparser.add_option('-c', '--clear', action='store_true', + help='Clear attributes') + argparser.add_option('-p', '--prompt', action='store_true', + help='Prompt for attribute values interactively') + argparser.add_option('-m', '--maxnodes', type='int', + help='Prompt if trying to set attributes on more ' + 'than specified number of nodes') + argparser.add_option('-s', '--set', dest='set', metavar='settings.batch', + default=False, help='set attributes using a batch file') + (options, args) = argparser.parse_args() -#setting minimal output to only output current information -showtype = 'current' -requestargs=None -try: - noderange = args[0] - nodelist = '/noderange/{0}/nodes/'.format(noderange) -except IndexError: - argparser.print_help() - sys.exit(1) -client.check_globbing(noderange) -session = client.Command() -exitcode = 0 - -#Sets attributes -nodetype="noderange" - -if len(args) > 1: - if "=" in args[1] or options.clear or options.environment or options.prompt: - if "=" in args[1] and options.clear: - print("Can not clear and set at the same time!") - argparser.print_help() - sys.exit(1) - argassign = None - if options.prompt: - argassign = {} - for arg in args[1:]: - oneval = 1 - twoval = 2 - while oneval != twoval: - oneval = getpass('Enter value for {0}: '.format(arg)) - twoval = getpass('Confirm value for {0}: '.format(arg)) - if oneval != twoval: - print('Values did not match.') - argassign[arg] = twoval - session.stop_if_noderange_over(noderange, options.maxnodes) - exitcode=client.updateattrib(session,args,nodetype, noderange, options, argassign) + #setting minimal output to only output current information + showtype = 'current' + requestargs=None try: - # setting user output to what the user inputs - if args[1] == 'all': - showtype = 'all' - requestargs=args[2:] - elif args[1] == 'current': - showtype = 'current' - requestargs=args[2:] - else: - showtype = 'all' - requestargs=args[1:] - except: - pass -elif options.clear or options.environment or options.prompt: - sys.stderr.write('Attribute names required with specified options\n') - argparser.print_help() - exitcode = 400 + noderange = args[0] + nodelist = '/noderange/{0}/nodes/'.format(noderange) + except IndexError: + argparser.print_help() + sys.exit(1) + client.check_globbing(noderange) + session = client.Command() + exitcode = 0 -elif options.set: - arglist = [noderange] - showtype='current' - argfile = open(options.set, 'r') - argset = argfile.readline() - while argset: + #Sets attributes + nodetype="noderange" + + if len(args) > 1: + if "=" in args[1] or options.clear or options.environment or options.prompt: + if "=" in args[1] and options.clear: + print("Can not clear and set at the same time!") + argparser.print_help() + sys.exit(1) + argassign = None + if options.prompt: + argassign = {} + for arg in args[1:]: + oneval = 1 + twoval = 2 + while oneval != twoval: + oneval = getpass('Enter value for {0}: '.format(arg)) + twoval = getpass('Confirm value for {0}: '.format(arg)) + if oneval != twoval: + print('Values did not match.') + argassign[arg] = twoval + session.stop_if_noderange_over(noderange, options.maxnodes) + exitcode=client.updateattrib(session,args,nodetype, noderange, options, argassign) try: - argset = argset[:argset.index('#')] - except ValueError: + # setting user output to what the user inputs + if args[1] == 'all': + showtype = 'all' + requestargs=args[2:] + elif args[1] == 'current': + showtype = 'current' + requestargs=args[2:] + else: + showtype = 'all' + requestargs=args[1:] + except: pass - argset = argset.strip() - if argset: - arglist += shlex.split(argset) - argset = argfile.readline() - session.stop_if_noderange_over(noderange, options.maxnodes) - exitcode=client.updateattrib(session,arglist,nodetype, noderange, options, None) -if exitcode != 0: + elif options.clear or options.environment or options.prompt: + sys.stderr.write('Attribute names required with specified options\n') + argparser.print_help() + exitcode = 400 + + elif options.set: + arglist = [noderange] + showtype='current' + argfile = open(options.set, 'r') + argset = argfile.readline() + while argset: + try: + argset = argset[:argset.index('#')] + except ValueError: + pass + argset = argset.strip() + if argset: + arglist += shlex.split(argset) + argset = argfile.readline() + session.stop_if_noderange_over(noderange, options.maxnodes) + exitcode=client.updateattrib(session,arglist,nodetype, noderange, options, None) + if exitcode != 0: + sys.exit(exitcode) + + # Lists all attributes + if len(args) > 0: + # setting output to all so it can search since if we do have something to search, we want to show all outputs even if it is blank. + if requestargs is None: + showtype = 'current' + elif requestargs == []: + #showtype already set + pass + else: + try: + requestargs.remove('all') + requestargs.remove('current') + except ValueError: + pass + exitcode = await client.printattributes(session, requestargs, showtype,nodetype, noderange, options) + else: + for res in session.read(nodelist): + if 'error' in res: + sys.stderr.write(res['error'] + '\n') + exitcode = 1 + else: + print(res['item']['href'].replace('/', '')) + sys.exit(exitcode) -# Lists all attributes -if len(args) > 0: - # setting output to all so it can search since if we do have something to search, we want to show all outputs even if it is blank. - if requestargs is None: - showtype = 'current' - elif requestargs == []: - #showtype already set - pass - else: - try: - requestargs.remove('all') - requestargs.remove('current') - except ValueError: - pass - exitcode = client.printattributes(session, requestargs, showtype,nodetype, noderange, options) -else: - for res in session.read(nodelist): - if 'error' in res: - sys.stderr.write(res['error'] + '\n') - exitcode = 1 - else: - print(res['item']['href'].replace('/', '')) - -sys.exit(exitcode) +if __name__ == '__main__': + asyncio.get_event_loop().run_until_complete(main()) diff --git a/confluent_client/bin/nodediscover b/confluent_client/bin/nodediscover index da8b0b21..48564195 100755 --- a/confluent_client/bin/nodediscover +++ b/confluent_client/bin/nodediscover @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2017 Lenovo @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import csv import optparse import os @@ -81,16 +82,15 @@ def subscribe_discovery(options, session, subscribe, targ): if 'status' in rsp: print(rsp['status']) -def print_disco(options, session, currmac, outhandler, columns): +async def print_disco(options, session, currmac, outhandler, columns): procinfo = {} - for tmpinfo in session.read('/discovery/by-mac/{0}'.format(currmac)): - + async for tmpinfo in session.read('/discovery/by-mac/{0}'.format(currmac)): procinfo.update(tmpinfo) if 'Switch' in columns or 'Port' in columns: if 'switch' in procinfo: procinfo['port'] = procinfo['switchport'] else: - for tmpinfo in session.read( + async for tmpinfo in session.read( '/networking/macs/by-mac/{0}'.format(currmac)): if 'ports' in tmpinfo: # The api sorts so that the most specific available value @@ -160,12 +160,12 @@ def datum_complete(datum): searchkeys = set(['mac', 'serial', 'uuid']) -def search_record(datum, options, session): +async def search_record(datum, options, session): for searchkey in searchkeys: options.__dict__[searchkey] = None for searchkey in searchkeys & set(datum): options.__dict__[searchkey] = datum[searchkey] - return list(list_matching_macs(options, session)) + return [x async for x in list_matching_macs(options, session)] def datum_to_attrib(datum): @@ -245,7 +245,7 @@ def import_csv(options, session): sys.exit(exitcode) -def list_discovery(options, session): +async def list_discovery(options, session): orderby = None if options.fields: columns = [] @@ -261,23 +261,23 @@ def list_discovery(options, session): if options.order.lower() == field.lower(): orderby = field outhandler = client.Tabulator(columns) - for mac in list_matching_macs(options, session): - print_disco(options, session, mac, outhandler, columns) + for mac in [x async for x in list_matching_macs(options, session)]: + await print_disco(options, session, mac, outhandler, columns) if options.csv: outhandler.write_csv(sys.stdout, orderby) else: for row in outhandler.get_table(orderby): print(row) -def clear_discovery(options, session): - for mac in list_matching_macs(options, session): +async def clear_discovery(options, session): + async for mac in list_matching_macs(options, session): for res in session.delete('/discovery/by-mac/{0}'.format(mac)): if 'deleted' in res: print('Cleared info for {0}'.format(res['deleted'])) else: print(repr(res)) -def list_matching_macs(options, session, node=None, checknode=True): +async def list_matching_macs(options, session, node=None, checknode=True): path = '/discovery/' if node: path += 'by-node/{0}/'.format(node) @@ -299,17 +299,16 @@ def list_matching_macs(options, session, node=None, checknode=True): path += 'by-mac/{0}'.format(options.mac) result = list(session.read(path))[0] if 'error' in result: - return [] - return [options.mac.replace(':', '-')] + return + yield options.mac.replace(':', '-') + return else: path += 'by-mac/' - ret = [] - for x in session.read(path): + async for x in session.read(path): if 'item' in x and 'href' in x['item']: - ret.append(x['item']['href']) - return ret + yield x['item']['href'] -def assign_discovery(options, session, needid=True): +async def assign_discovery(options, session, needid=True): abort = False if options.importfile: return import_csv(options, session) @@ -323,11 +322,11 @@ def assign_discovery(options, session, needid=True): abort = True if abort: sys.exit(1) - matches = list_matching_macs(options, session, None if needid else options.node, False) + matches = [x async for x in list_matching_macs(options, session, None if needid else options.node, False)] if not matches: # Do a rescan to catch missing requested data blocking_scan(session) - matches = list_matching_macs(options, session, None if needid else options.node, False) + matches = [x async for x in list_matching_macs(options, session, None if needid else options.node, False)] if not matches: sys.stderr.write("No matching discovery candidates found\n") sys.exit(1) @@ -351,7 +350,7 @@ def blocking_scan(session): list(session.update('/networking/macs/rescan', {'rescan': 'start'})) -def main(): +async def main(): parser = optparse.OptionParser( usage='Usage: %prog [list|assign|rescan|clear|subscribe|unsubscribe|register] [options]') # -a for 'address' maybe? @@ -402,13 +401,13 @@ def main(): sys.exit(1) session = client.Command() if args[0] == 'list': - list_discovery(options, session) + await list_discovery(options, session) if args[0] == 'clear': clear_discovery(options, session) if args[0] == 'assign': - assign_discovery(options, session) + await assign_discovery(options, session) if args[0] == 'reassign': - assign_discovery(options, session, False) + await assign_discovery(options, session, False) if args[0] == 'register': register_endpoint(options, session, args[1]) if args[0] == 'subscribe': @@ -421,4 +420,4 @@ def main(): if __name__ == '__main__': - main() + asyncio.get_event_loop().run_until_complete(main()) diff --git a/confluent_client/bin/nodelist b/confluent_client/bin/nodelist index 70ad36d3..e4f846ea 100755 --- a/confluent_client/bin/nodelist +++ b/confluent_client/bin/nodelist @@ -60,7 +60,7 @@ async def main(): requestargs=args[1:] nodetype='noderange' if len(args) > 1: - exitcode=client.printattributes(session,requestargs,showtype,nodetype,noderange,options) + exitcode=await client.printattributes(session,requestargs,showtype,nodetype,noderange,options) else: async for res in session.read(nodelist): if 'error' in res: diff --git a/confluent_client/bin/noderemove b/confluent_client/bin/noderemove index 816abd72..5a912fcf 100755 --- a/confluent_client/bin/noderemove +++ b/confluent_client/bin/noderemove @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import optparse import os import signal @@ -32,26 +33,32 @@ if path.startswith('/opt'): import confluent.client as client -argparser = optparse.OptionParser( - usage='''\n %prog + +async def main(): + argparser = optparse.OptionParser( + usage='''\n %prog \n ''') -argparser.add_option('-m', '--maxnodes', type='int', - help='Specify a maximum number of ' - 'nodes to delete, ' - 'prompting if over the threshold') -(options, args) = argparser.parse_args() -if len(args) != 1: - argparser.print_help() - sys.exit(1) -noderange = args[0] -client.check_globbing(noderange) -session = client.Command() -exitcode = 0 -session.stop_if_noderange_over(noderange, options.maxnodes) -for r in session.delete('/noderange/{0}'.format(noderange)): - if 'error' in r: - sys.stderr.write(r['error'] + '\n') - exitcode |= 1 - if 'deleted' in r: - print('{0}: deleted'.format(r['deleted'])) -sys.exit(exitcode) + argparser.add_option('-m', '--maxnodes', type='int', + help='Specify a maximum number of ' + 'nodes to delete, ' + 'prompting if over the threshold') + (options, args) = argparser.parse_args() + if len(args) != 1: + argparser.print_help() + sys.exit(1) + noderange = args[0] + client.check_globbing(noderange) + session = client.Command() + exitcode = 0 + await session.stop_if_noderange_over(noderange, options.maxnodes) + async for r in session.delete('/noderange/{0}'.format(noderange)): + if 'error' in r: + sys.stderr.write(r['error'] + '\n') + exitcode |= 1 + if 'deleted' in r: + print('{0}: deleted'.format(r['deleted'])) + sys.exit(exitcode) + + +if __name__ == '__main__': + asyncio.get_event_loop().run_until_complete(main()) diff --git a/confluent_client/confluent/client.py b/confluent_client/confluent/client.py index 633671f4..a7426983 100644 --- a/confluent_client/confluent/client.py +++ b/confluent_client/confluent/client.py @@ -286,10 +286,10 @@ class Command(object): cprint('') return 0 - def stop_if_noderange_over(self, noderange, maxnodes): + async def stop_if_noderange_over(self, noderange, maxnodes): if maxnodes is None: return - nsize = self.get_noderange_size(noderange) + nsize = await self.get_noderange_size(noderange) if nsize > maxnodes: if nsize == 1: nodename = list(self.read( @@ -304,16 +304,16 @@ class Command(object): raise Exception("Aborting at user request") - def get_noderange_size(self, noderange): + async def get_noderange_size(self, noderange): numnodes = 0 - for node in self.read('/noderange/{0}/nodes/'.format(noderange)): + async for node in self.read('/noderange/{0}/nodes/'.format(noderange)): if node.get('item', {}).get('href', None): numnodes += 1 else: raise Exception("Error trying to size noderange {0}".format(noderange)) return numnodes - def simple_nodegroups_command(self, noderange, resource, input=None, key=None, **kwargs): + async def simple_nodegroups_command(self, noderange, resource, input=None, key=None, **kwargs): try: rc = 0 if resource[0] == '/': @@ -324,12 +324,12 @@ class Command(object): else: ikey = key if input is None: - for res in self.read('/nodegroups/{0}/{1}'.format( + for res in await self.read('/nodegroups/{0}/{1}'.format( noderange, resource)): rc = self.handle_results(ikey, rc, res) else: kwargs[ikey] = input - for res in self.update('/nodegroups/{0}/{1}'.format( + for res in await self.update('/nodegroups/{0}/{1}'.format( noderange, resource), kwargs): rc = self.handle_results(ikey, rc, res) return rc @@ -349,20 +349,25 @@ class Command(object): await self.ensure_connected() if not self.authenticated: raise Exception('Unauthenticated') - return await send_request('update', path, self.connection, parameters) + async for rsp in send_request( + 'update', path, self.connection, parameters): + yield rsp async def create(self, path, parameters=None): await self.ensure_connected() if not self.authenticated: raise Exception('Unauthenticated') - async for rsp in send_request('create', path, self.connection, parameters): + async for rsp in send_request( + 'create', path, self.connection, parameters): yield rsp async def delete(self, path, parameters=None): await self.ensure_connected() if not self.authenticated: raise Exception('Unauthenticated') - return await send_request('delete', path, self.connection, parameters) + async for rsp in send_request( + 'delete', path, self.connection, parameters): + yield rsp def _connect_unix(self): self.connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) @@ -474,20 +479,20 @@ def attrrequested(attr, attrlist, seenattributes, node=None): return False -def printattributes(session, requestargs, showtype, nodetype, noderange, options): +async def printattributes(session, requestargs, showtype, nodetype, noderange, options): path = '/{0}/{1}/attributes/{2}'.format(nodetype, noderange, showtype) - return print_attrib_path(path, session, requestargs, options) + return await print_attrib_path(path, session, requestargs, options) def _sort_attrib(k): if isinstance(k[1], dict) and k[1].get('sortid', None) is not None: return k[1]['sortid'] return k[0] -def print_attrib_path(path, session, requestargs, options, rename=None, attrprefix=None): +async def print_attrib_path(path, session, requestargs, options, rename=None, attrprefix=None): exitcode = 0 seenattributes = NestedDict() allnodes = set([]) - for res in session.read(path): + async for res in session.read(path): if 'error' in res: sys.stderr.write(res['error'] + '\n') exitcode = 1 diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 21d24b1e..6832c399 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -33,6 +33,7 @@ # functions. Console is special and just get's passed through # see API.txt +import asyncio import confluent import confluent.alerts as alerts import confluent.log as log