2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-03-09 17:49:32 +00:00

Further asyncio port of confluent

Advance state of basic clients to advance testing and soon start doing
deeper activity.
This commit is contained in:
Jarrod Johnson
2024-03-06 16:50:34 -05:00
parent 635ef6073c
commit d42e8e0921
6 changed files with 194 additions and 177 deletions

View File

@@ -1,4 +1,4 @@
#!/usr/bin/python2
#!/usr/bin/python3
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2017 Lenovo
@@ -17,6 +17,7 @@
__author__ = 'alin37'
import asyncio
from getpass import getpass
import optparse
import os
@@ -36,123 +37,127 @@ if path.startswith('/opt'):
import confluent.client as client
argparser = optparse.OptionParser(
usage='''\n %prog [-b] noderange [list of attributes or 'all'] \
\n %prog -c noderange <list of attributes> \
\n %prog -e noderange <attribute names to set> \
\n %prog noderange attribute1=value1 attribute2=value,...
\n ''')
argparser.add_option('-b', '--blame', action='store_true',
help='Show information about how attributes inherited')
argparser.add_option('-e', '--environment', action='store_true',
help='Set attributes, but from environment variable of '
'same name')
argparser.add_option('-c', '--clear', action='store_true',
help='Clear attributes')
argparser.add_option('-p', '--prompt', action='store_true',
help='Prompt for attribute values interactively')
argparser.add_option('-m', '--maxnodes', type='int',
help='Prompt if trying to set attributes on more '
'than specified number of nodes')
argparser.add_option('-s', '--set', dest='set', metavar='settings.batch',
default=False, help='set attributes using a batch file')
(options, args) = argparser.parse_args()
async def main():
argparser = optparse.OptionParser(
usage='''\n %prog [-b] noderange [list of attributes or 'all'] \
\n %prog -c noderange <list of attributes> \
\n %prog -e noderange <attribute names to set> \
\n %prog noderange attribute1=value1 attribute2=value,...
\n ''')
argparser.add_option('-b', '--blame', action='store_true',
help='Show information about how attributes inherited')
argparser.add_option('-e', '--environment', action='store_true',
help='Set attributes, but from environment variable of '
'same name')
argparser.add_option('-c', '--clear', action='store_true',
help='Clear attributes')
argparser.add_option('-p', '--prompt', action='store_true',
help='Prompt for attribute values interactively')
argparser.add_option('-m', '--maxnodes', type='int',
help='Prompt if trying to set attributes on more '
'than specified number of nodes')
argparser.add_option('-s', '--set', dest='set', metavar='settings.batch',
default=False, help='set attributes using a batch file')
(options, args) = argparser.parse_args()
#setting minimal output to only output current information
showtype = 'current'
requestargs=None
try:
noderange = args[0]
nodelist = '/noderange/{0}/nodes/'.format(noderange)
except IndexError:
argparser.print_help()
sys.exit(1)
client.check_globbing(noderange)
session = client.Command()
exitcode = 0
#Sets attributes
nodetype="noderange"
if len(args) > 1:
if "=" in args[1] or options.clear or options.environment or options.prompt:
if "=" in args[1] and options.clear:
print("Can not clear and set at the same time!")
argparser.print_help()
sys.exit(1)
argassign = None
if options.prompt:
argassign = {}
for arg in args[1:]:
oneval = 1
twoval = 2
while oneval != twoval:
oneval = getpass('Enter value for {0}: '.format(arg))
twoval = getpass('Confirm value for {0}: '.format(arg))
if oneval != twoval:
print('Values did not match.')
argassign[arg] = twoval
session.stop_if_noderange_over(noderange, options.maxnodes)
exitcode=client.updateattrib(session,args,nodetype, noderange, options, argassign)
#setting minimal output to only output current information
showtype = 'current'
requestargs=None
try:
# setting user output to what the user inputs
if args[1] == 'all':
showtype = 'all'
requestargs=args[2:]
elif args[1] == 'current':
showtype = 'current'
requestargs=args[2:]
else:
showtype = 'all'
requestargs=args[1:]
except:
pass
elif options.clear or options.environment or options.prompt:
sys.stderr.write('Attribute names required with specified options\n')
argparser.print_help()
exitcode = 400
noderange = args[0]
nodelist = '/noderange/{0}/nodes/'.format(noderange)
except IndexError:
argparser.print_help()
sys.exit(1)
client.check_globbing(noderange)
session = client.Command()
exitcode = 0
elif options.set:
arglist = [noderange]
showtype='current'
argfile = open(options.set, 'r')
argset = argfile.readline()
while argset:
#Sets attributes
nodetype="noderange"
if len(args) > 1:
if "=" in args[1] or options.clear or options.environment or options.prompt:
if "=" in args[1] and options.clear:
print("Can not clear and set at the same time!")
argparser.print_help()
sys.exit(1)
argassign = None
if options.prompt:
argassign = {}
for arg in args[1:]:
oneval = 1
twoval = 2
while oneval != twoval:
oneval = getpass('Enter value for {0}: '.format(arg))
twoval = getpass('Confirm value for {0}: '.format(arg))
if oneval != twoval:
print('Values did not match.')
argassign[arg] = twoval
session.stop_if_noderange_over(noderange, options.maxnodes)
exitcode=client.updateattrib(session,args,nodetype, noderange, options, argassign)
try:
argset = argset[:argset.index('#')]
except ValueError:
# setting user output to what the user inputs
if args[1] == 'all':
showtype = 'all'
requestargs=args[2:]
elif args[1] == 'current':
showtype = 'current'
requestargs=args[2:]
else:
showtype = 'all'
requestargs=args[1:]
except:
pass
argset = argset.strip()
if argset:
arglist += shlex.split(argset)
argset = argfile.readline()
session.stop_if_noderange_over(noderange, options.maxnodes)
exitcode=client.updateattrib(session,arglist,nodetype, noderange, options, None)
if exitcode != 0:
elif options.clear or options.environment or options.prompt:
sys.stderr.write('Attribute names required with specified options\n')
argparser.print_help()
exitcode = 400
elif options.set:
arglist = [noderange]
showtype='current'
argfile = open(options.set, 'r')
argset = argfile.readline()
while argset:
try:
argset = argset[:argset.index('#')]
except ValueError:
pass
argset = argset.strip()
if argset:
arglist += shlex.split(argset)
argset = argfile.readline()
session.stop_if_noderange_over(noderange, options.maxnodes)
exitcode=client.updateattrib(session,arglist,nodetype, noderange, options, None)
if exitcode != 0:
sys.exit(exitcode)
# Lists all attributes
if len(args) > 0:
# setting output to all so it can search since if we do have something to search, we want to show all outputs even if it is blank.
if requestargs is None:
showtype = 'current'
elif requestargs == []:
#showtype already set
pass
else:
try:
requestargs.remove('all')
requestargs.remove('current')
except ValueError:
pass
exitcode = await client.printattributes(session, requestargs, showtype,nodetype, noderange, options)
else:
for res in session.read(nodelist):
if 'error' in res:
sys.stderr.write(res['error'] + '\n')
exitcode = 1
else:
print(res['item']['href'].replace('/', ''))
sys.exit(exitcode)
# Lists all attributes
if len(args) > 0:
# setting output to all so it can search since if we do have something to search, we want to show all outputs even if it is blank.
if requestargs is None:
showtype = 'current'
elif requestargs == []:
#showtype already set
pass
else:
try:
requestargs.remove('all')
requestargs.remove('current')
except ValueError:
pass
exitcode = client.printattributes(session, requestargs, showtype,nodetype, noderange, options)
else:
for res in session.read(nodelist):
if 'error' in res:
sys.stderr.write(res['error'] + '\n')
exitcode = 1
else:
print(res['item']['href'].replace('/', ''))
sys.exit(exitcode)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

View File

@@ -1,4 +1,4 @@
#!/usr/bin/python2
#!/usr/bin/python3
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2017 Lenovo
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import csv
import optparse
import os
@@ -81,16 +82,15 @@ def subscribe_discovery(options, session, subscribe, targ):
if 'status' in rsp:
print(rsp['status'])
def print_disco(options, session, currmac, outhandler, columns):
async def print_disco(options, session, currmac, outhandler, columns):
procinfo = {}
for tmpinfo in session.read('/discovery/by-mac/{0}'.format(currmac)):
async for tmpinfo in session.read('/discovery/by-mac/{0}'.format(currmac)):
procinfo.update(tmpinfo)
if 'Switch' in columns or 'Port' in columns:
if 'switch' in procinfo:
procinfo['port'] = procinfo['switchport']
else:
for tmpinfo in session.read(
async for tmpinfo in session.read(
'/networking/macs/by-mac/{0}'.format(currmac)):
if 'ports' in tmpinfo:
# The api sorts so that the most specific available value
@@ -160,12 +160,12 @@ def datum_complete(datum):
searchkeys = set(['mac', 'serial', 'uuid'])
def search_record(datum, options, session):
async def search_record(datum, options, session):
for searchkey in searchkeys:
options.__dict__[searchkey] = None
for searchkey in searchkeys & set(datum):
options.__dict__[searchkey] = datum[searchkey]
return list(list_matching_macs(options, session))
return [x async for x in list_matching_macs(options, session)]
def datum_to_attrib(datum):
@@ -245,7 +245,7 @@ def import_csv(options, session):
sys.exit(exitcode)
def list_discovery(options, session):
async def list_discovery(options, session):
orderby = None
if options.fields:
columns = []
@@ -261,23 +261,23 @@ def list_discovery(options, session):
if options.order.lower() == field.lower():
orderby = field
outhandler = client.Tabulator(columns)
for mac in list_matching_macs(options, session):
print_disco(options, session, mac, outhandler, columns)
for mac in [x async for x in list_matching_macs(options, session)]:
await print_disco(options, session, mac, outhandler, columns)
if options.csv:
outhandler.write_csv(sys.stdout, orderby)
else:
for row in outhandler.get_table(orderby):
print(row)
def clear_discovery(options, session):
for mac in list_matching_macs(options, session):
async def clear_discovery(options, session):
async for mac in list_matching_macs(options, session):
for res in session.delete('/discovery/by-mac/{0}'.format(mac)):
if 'deleted' in res:
print('Cleared info for {0}'.format(res['deleted']))
else:
print(repr(res))
def list_matching_macs(options, session, node=None, checknode=True):
async def list_matching_macs(options, session, node=None, checknode=True):
path = '/discovery/'
if node:
path += 'by-node/{0}/'.format(node)
@@ -299,17 +299,16 @@ def list_matching_macs(options, session, node=None, checknode=True):
path += 'by-mac/{0}'.format(options.mac)
result = list(session.read(path))[0]
if 'error' in result:
return []
return [options.mac.replace(':', '-')]
return
yield options.mac.replace(':', '-')
return
else:
path += 'by-mac/'
ret = []
for x in session.read(path):
async for x in session.read(path):
if 'item' in x and 'href' in x['item']:
ret.append(x['item']['href'])
return ret
yield x['item']['href']
def assign_discovery(options, session, needid=True):
async def assign_discovery(options, session, needid=True):
abort = False
if options.importfile:
return import_csv(options, session)
@@ -323,11 +322,11 @@ def assign_discovery(options, session, needid=True):
abort = True
if abort:
sys.exit(1)
matches = list_matching_macs(options, session, None if needid else options.node, False)
matches = [x async for x in list_matching_macs(options, session, None if needid else options.node, False)]
if not matches:
# Do a rescan to catch missing requested data
blocking_scan(session)
matches = list_matching_macs(options, session, None if needid else options.node, False)
matches = [x async for x in list_matching_macs(options, session, None if needid else options.node, False)]
if not matches:
sys.stderr.write("No matching discovery candidates found\n")
sys.exit(1)
@@ -351,7 +350,7 @@ def blocking_scan(session):
list(session.update('/networking/macs/rescan', {'rescan': 'start'}))
def main():
async def main():
parser = optparse.OptionParser(
usage='Usage: %prog [list|assign|rescan|clear|subscribe|unsubscribe|register] [options]')
# -a for 'address' maybe?
@@ -402,13 +401,13 @@ def main():
sys.exit(1)
session = client.Command()
if args[0] == 'list':
list_discovery(options, session)
await list_discovery(options, session)
if args[0] == 'clear':
clear_discovery(options, session)
if args[0] == 'assign':
assign_discovery(options, session)
await assign_discovery(options, session)
if args[0] == 'reassign':
assign_discovery(options, session, False)
await assign_discovery(options, session, False)
if args[0] == 'register':
register_endpoint(options, session, args[1])
if args[0] == 'subscribe':
@@ -421,4 +420,4 @@ def main():
if __name__ == '__main__':
main()
asyncio.get_event_loop().run_until_complete(main())

View File

@@ -60,7 +60,7 @@ async def main():
requestargs=args[1:]
nodetype='noderange'
if len(args) > 1:
exitcode=client.printattributes(session,requestargs,showtype,nodetype,noderange,options)
exitcode=await client.printattributes(session,requestargs,showtype,nodetype,noderange,options)
else:
async for res in session.read(nodelist):
if 'error' in res:

View File

@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import optparse
import os
import signal
@@ -32,26 +33,32 @@ if path.startswith('/opt'):
import confluent.client as client
argparser = optparse.OptionParser(
usage='''\n %prog <noderange>
async def main():
argparser = optparse.OptionParser(
usage='''\n %prog <noderange>
\n ''')
argparser.add_option('-m', '--maxnodes', type='int',
help='Specify a maximum number of '
'nodes to delete, '
'prompting if over the threshold')
(options, args) = argparser.parse_args()
if len(args) != 1:
argparser.print_help()
sys.exit(1)
noderange = args[0]
client.check_globbing(noderange)
session = client.Command()
exitcode = 0
session.stop_if_noderange_over(noderange, options.maxnodes)
for r in session.delete('/noderange/{0}'.format(noderange)):
if 'error' in r:
sys.stderr.write(r['error'] + '\n')
exitcode |= 1
if 'deleted' in r:
print('{0}: deleted'.format(r['deleted']))
sys.exit(exitcode)
argparser.add_option('-m', '--maxnodes', type='int',
help='Specify a maximum number of '
'nodes to delete, '
'prompting if over the threshold')
(options, args) = argparser.parse_args()
if len(args) != 1:
argparser.print_help()
sys.exit(1)
noderange = args[0]
client.check_globbing(noderange)
session = client.Command()
exitcode = 0
await session.stop_if_noderange_over(noderange, options.maxnodes)
async for r in session.delete('/noderange/{0}'.format(noderange)):
if 'error' in r:
sys.stderr.write(r['error'] + '\n')
exitcode |= 1
if 'deleted' in r:
print('{0}: deleted'.format(r['deleted']))
sys.exit(exitcode)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

View File

@@ -286,10 +286,10 @@ class Command(object):
cprint('')
return 0
def stop_if_noderange_over(self, noderange, maxnodes):
async def stop_if_noderange_over(self, noderange, maxnodes):
if maxnodes is None:
return
nsize = self.get_noderange_size(noderange)
nsize = await self.get_noderange_size(noderange)
if nsize > maxnodes:
if nsize == 1:
nodename = list(self.read(
@@ -304,16 +304,16 @@ class Command(object):
raise Exception("Aborting at user request")
def get_noderange_size(self, noderange):
async def get_noderange_size(self, noderange):
numnodes = 0
for node in self.read('/noderange/{0}/nodes/'.format(noderange)):
async for node in self.read('/noderange/{0}/nodes/'.format(noderange)):
if node.get('item', {}).get('href', None):
numnodes += 1
else:
raise Exception("Error trying to size noderange {0}".format(noderange))
return numnodes
def simple_nodegroups_command(self, noderange, resource, input=None, key=None, **kwargs):
async def simple_nodegroups_command(self, noderange, resource, input=None, key=None, **kwargs):
try:
rc = 0
if resource[0] == '/':
@@ -324,12 +324,12 @@ class Command(object):
else:
ikey = key
if input is None:
for res in self.read('/nodegroups/{0}/{1}'.format(
for res in await self.read('/nodegroups/{0}/{1}'.format(
noderange, resource)):
rc = self.handle_results(ikey, rc, res)
else:
kwargs[ikey] = input
for res in self.update('/nodegroups/{0}/{1}'.format(
for res in await self.update('/nodegroups/{0}/{1}'.format(
noderange, resource), kwargs):
rc = self.handle_results(ikey, rc, res)
return rc
@@ -349,20 +349,25 @@ class Command(object):
await self.ensure_connected()
if not self.authenticated:
raise Exception('Unauthenticated')
return await send_request('update', path, self.connection, parameters)
async for rsp in send_request(
'update', path, self.connection, parameters):
yield rsp
async def create(self, path, parameters=None):
await self.ensure_connected()
if not self.authenticated:
raise Exception('Unauthenticated')
async for rsp in send_request('create', path, self.connection, parameters):
async for rsp in send_request(
'create', path, self.connection, parameters):
yield rsp
async def delete(self, path, parameters=None):
await self.ensure_connected()
if not self.authenticated:
raise Exception('Unauthenticated')
return await send_request('delete', path, self.connection, parameters)
async for rsp in send_request(
'delete', path, self.connection, parameters):
yield rsp
def _connect_unix(self):
self.connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -474,20 +479,20 @@ def attrrequested(attr, attrlist, seenattributes, node=None):
return False
def printattributes(session, requestargs, showtype, nodetype, noderange, options):
async def printattributes(session, requestargs, showtype, nodetype, noderange, options):
path = '/{0}/{1}/attributes/{2}'.format(nodetype, noderange, showtype)
return print_attrib_path(path, session, requestargs, options)
return await print_attrib_path(path, session, requestargs, options)
def _sort_attrib(k):
if isinstance(k[1], dict) and k[1].get('sortid', None) is not None:
return k[1]['sortid']
return k[0]
def print_attrib_path(path, session, requestargs, options, rename=None, attrprefix=None):
async def print_attrib_path(path, session, requestargs, options, rename=None, attrprefix=None):
exitcode = 0
seenattributes = NestedDict()
allnodes = set([])
for res in session.read(path):
async for res in session.read(path):
if 'error' in res:
sys.stderr.write(res['error'] + '\n')
exitcode = 1

View File

@@ -33,6 +33,7 @@
# functions. Console is special and just get's passed through
# see API.txt
import asyncio
import confluent
import confluent.alerts as alerts
import confluent.log as log