2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-04-24 01:31:27 +00:00
Files
confluent/confluent_server/confluent/core.py

1565 lines
60 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 IBM Corporation
# Copyright 2015-2018 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# concept here that mapping from the resource tree and arguments go to
# specific python class signatures. The intent is to require
# plugin authors to come here if they *really* think they need new 'commands'
# and hopefully curtail deviation by each plugin author
# have to specify a standard place for cfg selection of *which* plugin
# as well a standard to map api requests to python funcitons
# e.g. <nodeelement>/power/state maps to some plugin
# HardwareManager.get_power/set_power selected by hardwaremanagement.method
# plugins can advertise a set of names if there is a desire for readable things
# exceptions to handle os images
# endpoints point to a class... usually, the class should have:
# -create
# -retrieve
# -update
# -delete
# functions. Console is special and just get's passed through
# see API.txt
import asyncio
import confluent
import confluent.alerts as alerts
import confluent.log as log
import confluent.asynctlvdata as tlvdata
import confluent.config.attributes as attrscheme
import confluent.config.configmanager as cfm
import confluent.collective.manager as collective
import confluent.discovery.core as disco
import confluent.interface.console as console
import confluent.exceptions as exc
import confluent.messages as msg
import confluent.mountmanager as mountmanager
import confluent.networking.macmap as macmap
import confluent.noderange as noderange
import confluent.osimage as osimage
import confluent.plugin as plugin
import types
try:
import confluent.shellmodule as shellmodule
except ImportError:
pass
import confluent.tasks as tasks
import confluent.util as util
import inspect
import itertools
import msgpack
import os
import struct
import sys
import uuid
import yaml
import shutil
vinz = None
pluginmap = {}
dispatch_plugins = (b'remoteconfig', b'ipmi', u'ipmi', b'redfish', u'redfish', b'tsmsol', u'tsmsol', b'geist', u'geist', b'deltapdu', u'deltapdu', b'eatonpdu', u'eatonpdu', b'raritan', u'raritan', b'affluent', u'affluent', b'cnos', u'cnos', b'enos', u'enos')
PluginCollection = plugin.PluginCollection
try:
unicode
except NameError:
unicode = str
async def iterate_responses(responses):
# normalize plugin behaviors
# First, take care of whatever potentially nested levels of awaitables
# Then handle async generators, generators, then just general iterable types
while inspect.isawaitable(responses):
responses = await responses
if inspect.isasyncgen(responses):
async for rsp in responses:
yield rsp
return
elif inspect.isgenerator(responses) or isinstance(responses, list) or isinstance(responses, tuple):
for rsp in responses:
yield rsp
return
for rsp in responses:
yield rsp
def seek_element(currplace, currkey, depth):
try:
return currplace[currkey]
except TypeError:
if isinstance(currplace, PluginCollection):
# we hit a plugin curated collection, all children
# are up to the plugin to comprehend
if currplace.maxdepth and depth > currplace.maxdepth:
raise
return currplace
raise
def nested_lookup(nestdict, key):
try:
currloc = nestdict
for i in range(len(key)):
currk = key[i]
currloc = seek_element(currloc, currk, len(key) - i)
return currloc
except TypeError:
raise exc.NotFoundException("Invalid element requested")
def load_plugins():
# To know our plugins directory, we get the parent path of 'bin'
_init_core()
path = os.path.dirname(os.path.realpath(__file__))
plugintop = os.path.realpath(os.path.join(path, 'plugins'))
plugins = set()
for plugindir in os.listdir(plugintop):
plugindir = os.path.join(plugintop, plugindir)
if not os.path.isdir(plugindir):
continue
sys.path.insert(1, plugindir)
# two passes, to avoid adding both py and pyc files
for plugin in os.listdir(plugindir):
if plugin.startswith('.'):
continue
if '__pycache__' in plugin:
continue
(plugin, plugtype) = os.path.splitext(plugin)
if plugtype == '.sh':
pluginmap[plugin] = shellmodule.Plugin(
os.path.join(plugindir, plugin + '.sh'))
elif "__init__" not in plugin:
plugins.add(plugin)
for plugin in plugins:
tmpmod = __import__(plugin)
if 'plugin_names' in tmpmod.__dict__:
for name in tmpmod.plugin_names:
pluginmap[name] = tmpmod
else:
pluginmap[plugin] = tmpmod
_register_resource(tmpmod)
plugins.clear()
# restore path to not include the plugindir
sys.path.pop(1)
disco.register_affluent(pluginmap['affluent'])
def _register_resource(plugin):
global noderesources
if 'custom_resources' in plugin.__dict__:
_merge_dict(noderesources, plugin.custom_resources)
def _merge_dict(original, custom):
for k,v in custom.items():
if k in original:
if isinstance(original.get(k), dict):
_merge_dict(original.get(k), custom.get(k))
else:
original[k] = custom.get(k)
else:
original[k] = custom.get(k)
rootcollections = ['deployment/', 'discovery/', 'events/', 'networking/',
'noderange/', 'nodes/', 'nodegroups/', 'storage/', 'usergroups/',
'users/', 'uuid', 'version', 'staging/']
class PluginRoute(object):
def __init__(self, routedict):
self.routeinfo = routedict
async def handle_storage(configmanager, inputdata, pathcomponents, operation):
if len(pathcomponents) == 1:
yield msg.ChildCollection('remote/')
return
if pathcomponents[1] == 'remote':
async for rsp in mountmanager.handle_request(configmanager, inputdata, pathcomponents[2:], operation):
yield rsp
async def handle_deployment(configmanager, inputdata, pathcomponents,
operation):
if len(pathcomponents) == 1:
yield msg.ChildCollection('distributions/')
yield msg.ChildCollection('profiles/')
yield msg.ChildCollection('importing/')
return
if pathcomponents[1] == 'distributions':
if len(pathcomponents) == 2 and operation == 'retrieve':
for dist in osimage.list_distros():
yield msg.ChildCollection(dist + '/')
return
if len(pathcomponents) == 3:
distname = pathcomponents[-1]
if 'operation' == 'update':
if inputdata.get('rescan', False):
osimage.rescan_dist(distname)
if pathcomponents[1] == 'profiles':
if len(pathcomponents) == 2 and operation == 'retrieve':
for prof in osimage.list_profiles():
yield msg.ChildCollection(prof + '/')
return
if len(pathcomponents) >= 3:
profname = pathcomponents[2]
if len(pathcomponents) == 4:
if operation == 'retrieve':
if len(pathcomponents) == 4 and pathcomponents[-1] == 'info':
with open('/var/lib/confluent/public/os/{}/profile.yaml'.format(profname)) as profyaml:
profinfo = yaml.safe_load(profyaml)
profinfo['name'] = profname
#check if boot.ipxe is older than profile.yaml
yield msg.KeyValueData(profinfo)
return
elif len(pathcomponents) == 3:
if operation == 'retrieve':
yield msg.ChildCollection('info')
if operation == 'update':
if 'updateboot' in inputdata:
await osimage.update_boot(profname)
yield msg.KeyValueData({'updated': profname})
return
elif 'rebase' in inputdata:
try:
updated, customized = await osimage.rebase_profile(profname)
except osimage.ManifestMissing:
raise exc.InvalidArgumentException('Specified profile {0} does not have a manifest.yaml for rebase'.format(profname))
for upd in updated:
yield msg.KeyValueData({'updated': upd})
for cust in customized:
yield msg.KeyValueData({'customized': cust})
return
if pathcomponents[1] == 'fingerprint':
if operation == 'create':
importer = osimage.MediaImporter()
await importer.init(inputdata['filename'], configmanager, checkonly=True)
medinfo = {
'targetpath': importer.targpath,
'name': importer.osname,
'oscategory': importer.oscategory,
'errors': importer.errors,
}
yield msg.KeyValueData(medinfo)
return
if pathcomponents[1] == 'importing':
if len(pathcomponents) == 2 or not pathcomponents[-1]:
if operation == 'retrieve':
for imp in osimage.list_importing():
yield imp
return
elif operation == 'create':
if inputdata.get('custname', None):
importer = osimage.MediaImporter()
await importer.init(inputdata['filename'],
configmanager, inputdata['custname'])
else:
importer = osimage.MediaImporter()
await importer.init(inputdata['filename'],
configmanager)
yield msg.KeyValueData({'target': importer.targpath,
'name': importer.importkey})
return
elif len(pathcomponents) == 3:
if operation == 'retrieve':
for res in osimage.get_importing_status(pathcomponents[-1]):
yield res
return
elif operation == 'delete':
for res in osimage.remove_importing(pathcomponents[-1]):
yield res
return
raise exc.NotFoundException('Unrecognized request')
def _init_core():
global noderesources
global nodegroupresources
import confluent.shellserver as shellserver
# _ prefix indicates internal use (e.g. special console scheme) and should not
# be enumerated in any collection
noderesources = {
'attributes': {
'rename': PluginRoute({'handler': 'attributes'}),
'all': PluginRoute({'handler': 'attributes'}),
'current': PluginRoute({'handler': 'attributes'}),
'expression': PluginRoute({'handler': 'attributes'}),
},
'boot': {
'nextdevice': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'configuration': {
'management_controller': {
'alerts': {
'destinations': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'certificate': {
'sign': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'generate_csr': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'install': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'certificate_authorities': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'clear': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'users': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'licenses': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'save_licenses': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'net_interfaces': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'reset': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'hostname': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'identifier': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'domain_name': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'location': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'ntp': {
'enabled': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'servers': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'extended': {
'all': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'extra': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'advanced': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'extra_advanced': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
},
'storage': {
'all': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'arrays': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'disks': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'volumes': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
})
},
'system': {
'all': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'advanced': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'clear': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
})
},
},
'_console': {
'session': PluginRoute({
'pluginattrs': ['console.method'],
}),
},
'_shell': {
'session': PluginRoute({
# For now, not configurable, wait until there's demand
'handler': 'ssh',
}),
},
'_enclosure': {
'reseat_bay': PluginRoute(
{'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi'}),
},
'shell': {
# another special case similar to console
'sessions': PluginCollection({
'handler': shellserver,
}),
},
'console': {
# this is a dummy value, http or socket must handle special
'session': None,
'license': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'graphical': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'ikvm': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'ikvm_methods': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'ikvm_screenshot': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'description': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'deployment': {
'lock': PluginRoute({
'handler': 'attributes'
}),
'ident_image': PluginRoute({
'handler': 'identimage'
}),
'remote_config': {
'run': PluginRoute({
'handler': 'remoteconfig'
}),
'active': PluginCollection({
'handler': 'remoteconfig'
}),
},
},
'events': {
'hardware': {
'log': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'decode': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
},
#'forward': {
# # Another dummy value, currently only for the gui
# 'web': None,
#},
'health': {
'hardware': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'identify': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'inventory': {
'hardware': {
'all': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'firmware': {
'all': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'core': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'adapters': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'disks': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'misc': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'updatestatus': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'updates': {
'active': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
},
},
'layout': PluginRoute({'handler': 'layout'}),
'media': {
'uploads': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'attach': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'detach': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'current': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'power': {
'state': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'inlets': PluginCollection({'handler': 'pdu'}),
'outlets': PluginCollection({'pluginattrs': ['hardwaremanagement.method']}),
'reseat': PluginRoute({'handler': 'enclosure'}),
},
'sensors': {
'hardware': {
'all': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'normalized': {
'inlet_temp': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'average_cpu_temp': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'total_power': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'energy': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'temperature': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'power': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'fans': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'leds': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
},
'support': {
'servicedata': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
}
nodegroupresources = {
'attributes': {
'check': PluginRoute({'handler': 'attributes'}),
'rename': PluginRoute({'handler': 'attributes'}),
'all': PluginRoute({'handler': 'attributes'}),
'current': PluginRoute({'handler': 'attributes'}),
},
}
async def create_user(inputdata, configmanager):
try:
username = inputdata['name']
del inputdata['name']
role = inputdata['role']
del inputdata['role']
except (KeyError, ValueError):
raise exc.InvalidArgumentException('Missing user name or role')
await configmanager.create_user(username, role, attributemap=inputdata)
async def create_usergroup(inputdata, configmanager):
try:
groupname = inputdata['name']
role = inputdata['role']
del inputdata['name']
del inputdata['role']
except (KeyError, ValueError):
raise exc.InvalidArgumentException("Missing user name or role")
await configmanager.create_usergroup(groupname, role)
async def update_usergroup(groupname, attribmap, configmanager):
try:
await configmanager.set_usergroup(groupname, attribmap)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
async def update_user(name, attribmap, configmanager):
try:
await configmanager.set_user(name, attribmap)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
def show_usergroup(groupname, configmanager):
groupinfo = configmanager.get_usergroup(groupname)
for attr in groupinfo:
yield msg.Attributes(kv={attr: groupinfo[attr]})
def show_user(name, configmanager):
userobj = configmanager.get_user(name)
rv = {}
for attr in attrscheme.user:
rv[attr] = None
if attr == 'password':
if 'cryptpass' in userobj:
rv['password'] = {'cryptvalue': True}
yield msg.CryptedAttributes(kv={'password': rv['password']},
desc=attrscheme.user[attr][
'description'])
else:
if attr in userobj:
rv[attr] = userobj[attr]
yield msg.Attributes(kv={attr: rv[attr]},
desc=attrscheme.user[attr]['description'])
if 'role' in userobj:
yield msg.Attributes(kv={'role': userobj['role']})
async def stripnode(iterablersp, node):
async for i in iterate_responses(iterablersp):
if i is None:
raise exc.NotImplementedException("Not Implemented")
if isinstance(i, console.Console):
yield i
continue
i.strip_node(node)
yield i
async def iterate_collections(iterable, forcecollection=True):
for coll in iterable:
if forcecollection and coll[-1] != '/':
coll += '/'
yield msg.ChildCollection(coll, candelete=True)
async def iterate_resources(fancydict):
for resource in fancydict:
if resource.startswith("_"):
continue
if resource == 'abbreviate':
pass
elif not isinstance(fancydict[resource], PluginRoute): # a resource
resource += '/'
yield msg.ChildCollection(resource)
async def delete_user(user, configmanager):
await configmanager.del_user(user)
yield msg.DeletedResource(user)
async def delete_usergroup(usergroup, configmanager):
await configmanager.del_usergroup(usergroup)
yield msg.DeletedResource(usergroup)
async def delete_nodegroup_collection(collectionpath, configmanager):
if len(collectionpath) == 2: # just the nodegroup
group = collectionpath[-1]
await configmanager.del_groups([group])
yield msg.DeletedResource(group)
else:
raise Exception("Not implemented")
async def delete_node_collection(collectionpath, configmanager, isnoderange):
if len(collectionpath) == 2: # just node
nodes = [collectionpath[-1]]
if isnoderange:
nodes = noderange.NodeRange(nodes[0], configmanager).nodes
await configmanager.del_nodes(nodes)
for node in nodes:
yield msg.DeletedResource(node)
else:
raise Exception("Not implemented")
def enumerate_nodegroup_collection(collectionpath, configmanager):
nodegroup = collectionpath[1]
if not configmanager.is_nodegroup(nodegroup):
raise exc.NotFoundException(
'Invalid nodegroup: {0} not found'.format(nodegroup))
del collectionpath[0:2]
collection = nested_lookup(nodegroupresources, collectionpath)
return iterate_resources(collection)
def enumerate_node_collection(collectionpath, configmanager):
if collectionpath == ['nodes']: # it is just '/node/', need to list nodes
allnodes = list(configmanager.list_nodes())
try:
allnodes.sort(key=noderange.humanify_nodename)
except TypeError:
allnodes.sort()
return iterate_collections(allnodes)
nodeorrange = collectionpath[1]
if collectionpath[0] == 'nodes' and not configmanager.is_node(nodeorrange):
raise exc.NotFoundException("Invalid element requested")
collection = nested_lookup(noderesources, collectionpath[2:])
if len(collectionpath) == 2 and collectionpath[0] == 'noderange':
collection['nodes'] = {}
collection['abbreviate'] = {}
if not isinstance(collection, dict):
raise exc.NotFoundException("Invalid element requested")
return iterate_resources(collection)
async def create_group(inputdata, configmanager):
try:
groupname = inputdata['name']
del inputdata['name']
attribmap = {groupname: inputdata}
except KeyError:
raise exc.InvalidArgumentException()
try:
await configmanager.add_group_attributes(attribmap)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
yield msg.CreatedResource(groupname)
async def create_node(inputdata, configmanager):
try:
nodename = inputdata['name']
if ' ' in nodename:
raise exc.InvalidArgumentException('Name "{0}" is not supported'.format(nodename))
del inputdata['name']
attribmap = {nodename: inputdata}
except KeyError:
raise exc.InvalidArgumentException('name not specified')
try:
await configmanager.add_node_attributes(attribmap)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
yield msg.CreatedResource(nodename)
async def create_noderange(inputdata, configmanager):
try:
noder = inputdata['name']
del inputdata['name']
attribmap = {}
for node in noderange.NodeRange(noder).nodes:
attribmap[node] = inputdata
except KeyError:
raise exc.InvalidArgumentException('name not specified')
try:
await configmanager.add_node_attributes(attribmap)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
for node in attribmap:
yield msg.CreatedResource(node)
async def enumerate_collections(collections):
for collection in collections:
yield msg.ChildCollection(collection)
async def handle_nodegroup_request(configmanager, inputdata,
pathcomponents, operation):
iscollection = False
routespec = None
if len(pathcomponents) < 2:
if operation == "create":
inputdata = msg.InputAttributes(pathcomponents, inputdata)
return create_group(inputdata.attribs, configmanager)
allgroups = list(configmanager.get_groups())
try:
allgroups.sort(key=noderange.humanify_nodename)
except TypeError:
allgroups.sort()
return iterate_collections(allgroups)
elif len(pathcomponents) == 2:
iscollection = True
else:
try:
routespec = nested_lookup(nodegroupresources, pathcomponents[2:])
if isinstance(routespec, dict):
iscollection = True
elif isinstance(routespec, PluginCollection):
iscollection = False # it is a collection, but plugin defined
except KeyError:
raise exc.NotFoundException("Invalid element requested")
if iscollection:
if operation == "delete":
return delete_nodegroup_collection(pathcomponents,
configmanager)
elif operation == "retrieve":
return enumerate_nodegroup_collection(pathcomponents,
configmanager)
else:
raise Exception("TODO")
plugroute = routespec.routeinfo
inputdata = msg.get_input_message(
pathcomponents[2:], operation, inputdata)
if 'handler' in plugroute: # fixed handler definition
hfunc = getattr(pluginmap[plugroute['handler']], operation)
return hfunc(
nodes=None, element=pathcomponents,
configmanager=configmanager,
inputdata=inputdata)
raise Exception("unknown case encountered")
class BadPlugin(object):
def __init__(self, node, plugin):
self.node = node
self.plugin = plugin
def error(self, *args, **kwargs):
yield msg.ConfluentNodeError(
self.node, self.plugin + ' is not a supported plugin')
class BadCollective(object):
def __init__(self, node):
self.node = node
def error(self, *args, **kwargs):
yield msg.ConfluentNodeError(
self.node, 'collective mode is active, but collective.manager '
'is not set for this node')
def abbreviate_noderange(configmanager, inputdata, operation):
if operation != 'create':
raise exc.InvalidArgumentException('Must be a create with nodes in list')
if 'nodes' not in inputdata:
raise exc.InvalidArgumentException('Must be given list of nodes under key named nodes')
if isinstance(inputdata['nodes'], str) or isinstance(inputdata['nodes'], unicode):
inputdata['nodes'] = inputdata['nodes'].split(',')
return (msg.KeyValueData({'noderange': noderange.ReverseNodeRange(inputdata['nodes'], configmanager).noderange}),)
async def _keepalivefn(connection, xmitlock):
while True:
await asyncio.sleep(30)
async with xmitlock:
connection.sendall(b'\x00\x00\x00\x00\x00\x00\x00\x01\x00')
async def handle_dispatch(connection, cert, dispatch, peername):
if not util.cert_matches(
cfm.get_collective_member(peername)['fingerprint'], cert):
connection.close()
return
if dispatch[0:2] != b'\x01\x03': # magic value to indicate msgpack
# We only support msgpack now
# The magic should preclude any pickle, as the first byte can never be
# under 0x20 or so.
connection[1].close()
await connection[1].wait_closed()
return
xmitlock = asyncio.Lock()
keepalive = tasks.spawn_task(_keepalivefn(connection, xmitlock))
dispatch = msgpack.unpackb(dispatch[2:], raw=False)
configmanager = cfm.ConfigManager(dispatch['tenant'])
nodes = dispatch['nodes']
inputdata = dispatch['inputdata']
operation = dispatch['operation']
pathcomponents = dispatch['path']
routespec = nested_lookup(noderesources, pathcomponents)
try:
inputdata = msg.get_input_message(
pathcomponents, operation, inputdata, nodes, dispatch['isnoderange'],
configmanager)
except Exception as res:
async with xmitlock:
await _forward_rsp(connection, res)
keepalive.cancel()
connection[1].write('\x00\x00\x00\x00\x00\x00\x00\x00')
await connection[1].drain()
connection[1].close()
await connection[1].wait_closed()
return
plugroute = routespec.routeinfo
nodesbyhandler = {}
nodeattr = configmanager.get_node_attributes(
nodes, plugroute['pluginattrs'])
for node in nodes:
plugpath = None
for attrname in plugroute['pluginattrs']:
if attrname in nodeattr[node]:
plugpath = nodeattr[node][attrname]['value']
if not plugpath and 'default' in plugroute:
plugpath = plugroute['default']
if plugpath:
try:
hfunc = getattr(pluginmap[plugpath], operation)
except KeyError:
nodesbyhandler[BadPlugin(node, plugpath).error] = [node]
continue
if hfunc in nodesbyhandler:
nodesbyhandler[hfunc].append(node)
else:
nodesbyhandler[hfunc] = [node]
try:
passvalues = asyncio.Queue()
numworkers = 0
for hfunc in nodesbyhandler:
numworkers += 1
tasks.spawn(addtoqueue(passvalues, hfunc, {
'nodes': nodesbyhandler[hfunc],
'element': pathcomponents,
'configmanager': configmanager,
'inputdata': inputdata}))
async for res in iterate_queue(numworkers, passvalues):
async with xmitlock:
await _forward_rsp(connection, res)
except Exception as res:
print("oh noes, " + repr(res))
async with xmitlock:
await _forward_rsp(connection, res)
keepalive.cancel()
connection[1].write(b'\x00\x00\x00\x00\x00\x00\x00\x00')
await connection[1].drain()
connection[1].close()
await connection[1].wait_closed()
async def _forward_rsp(connection, res):
try:
r = res.serialize()
except AttributeError:
if isinstance(res, Exception):
r = msgpack.packb(['Exception', str(res)], use_bin_type=False)
else:
r = msgpack.packb(
['Exception', 'Unable to serialize response ' + repr(res)],
use_bin_type=False)
except Exception as e:
r = msgpack.packb(
['Exception',
'Unable to serialize response ' + repr(res) + ' due to ' + str(e)],
use_bin_type=False)
rlen = len(r)
if not rlen:
return
connection[1].write(struct.pack('!Q', rlen))
connection[1].write(r)
await connection[1].drain()
async def handle_node_request(configmanager, inputdata, operation,
pathcomponents, autostrip=True):
global vinz
if log.logfull:
raise exc.TargetResourceUnavailable('Filesystem full, free up space and restart confluent service')
iscollection = False
routespec = None
if pathcomponents[0] == 'noderange':
if len(pathcomponents) > 3 and pathcomponents[2] == 'nodes':
# transform into a normal looking node request
# this does mean we don't see if it is a valid
# child, but that's not a goal for the noderange
# facility anyway
isnoderange = False
pathcomponents = pathcomponents[2:]
elif len(pathcomponents) == 3 and pathcomponents[2] == 'abbreviate':
return abbreviate_noderange(configmanager, inputdata, operation)
else:
isnoderange = True
else:
isnoderange = False
try:
nodeorrange = pathcomponents[1]
if not isnoderange and not configmanager.is_node(nodeorrange):
raise exc.NotFoundException(f'Invalid Node: {repr(pathcomponents)}')
if isnoderange and not (len(pathcomponents) == 3 and
pathcomponents[2] == 'abbreviate'):
try:
nodes = noderange.NodeRange(nodeorrange, configmanager).nodes
except Exception as e:
raise exc.NotFoundException("Invalid Noderange: " + str(e))
else:
nodes = (nodeorrange,)
except IndexError: # doesn't actually have a long enough path
# this is enumerating a list of nodes or just empty noderange
if isnoderange and operation == "retrieve":
return iterate_collections([])
elif isnoderange and operation == "create":
inputdata = msg.InputAttributes(pathcomponents, inputdata)
return create_noderange(inputdata.attribs, configmanager)
elif isnoderange or operation == "delete":
raise exc.InvalidArgumentException()
if operation == "create":
inputdata = msg.InputAttributes(pathcomponents, inputdata)
return create_node(inputdata.attribs, configmanager)
allnodes = list(configmanager.list_nodes())
try:
allnodes.sort(key=noderange.humanify_nodename)
except TypeError:
allnodes.sort()
return iterate_collections(allnodes)
if (isnoderange and len(pathcomponents) == 3 and
pathcomponents[2] == 'nodes'):
# this means that it's a list of relevant nodes
nodes = list(nodes)
try:
nodes.sort(key=noderange.humanify_nodename)
except TypeError:
nodes.sort()
return iterate_collections(nodes)
if len(pathcomponents) == 2:
iscollection = True
else:
try:
routespec = nested_lookup(noderesources, pathcomponents[2:])
except KeyError:
raise exc.NotFoundException("Invalid element requested")
if isinstance(routespec, dict):
iscollection = True
elif isinstance(routespec, PluginCollection):
iscollection = False # it is a collection, but plugin defined
elif routespec is None:
raise exc.InvalidArgumentException('Custom interface required for resource')
if iscollection:
if operation == "delete":
return delete_node_collection(pathcomponents, configmanager,
isnoderange)
elif operation == "retrieve":
return enumerate_node_collection(pathcomponents, configmanager)
else:
raise Exception("TODO here")
del pathcomponents[0:2]
passvalues = asyncio.Queue()
plugroute = routespec.routeinfo
_plugin = None
if 'handler' in plugroute: # fixed handler definition, easy enough
if isinstance(plugroute['handler'], str):
hfunc = getattr(pluginmap[plugroute['handler']], operation)
_plugin = pluginmap[plugroute['handler']]
else:
hfunc = getattr(plugroute['handler'], operation)
_plugin = plugroute['handler']
msginputdata = _get_input_data(_plugin, pathcomponents, operation,
inputdata, nodes, isnoderange,
configmanager)
passvalue = hfunc(
nodes=nodes, element=pathcomponents,
configmanager=configmanager,
inputdata=msginputdata)
if isnoderange:
return passvalue
else:
return stripnode(passvalue, nodes[0])
elif 'pluginattrs' in plugroute:
nodeattr = configmanager.get_node_attributes(
nodes, plugroute['pluginattrs'] + ['collective.manager'])
nodesbymanager = {}
nodesbyhandler = {}
badcollnodes = []
for node in nodes:
plugpath = None
for attrname in plugroute['pluginattrs']:
if attrname in nodeattr[node]:
plugpath = nodeattr[node][attrname]['value']
if not plugpath and 'default' in plugroute:
plugpath = plugroute['default']
if plugpath in dispatch_plugins:
cfm.check_quorum()
if pathcomponents == ['console', 'ikvm']:
if not vinz:
import confluent.vinzmanager as vinz
vinz.assure_vinz()
manager = nodeattr[node].get('collective.manager', {}).get(
'value', None)
if manager:
if collective.get_myname() != manager:
if manager not in nodesbymanager:
nodesbymanager[manager] = set([node])
else:
nodesbymanager[manager].add(node)
continue
elif list(cfm.list_collective()):
badcollnodes.append(node)
continue
if plugpath:
try:
_plugin = pluginmap[plugpath]
hfunc = getattr(pluginmap[plugpath], operation)
except KeyError:
nodesbyhandler[BadPlugin(node, plugpath).error] = [node]
continue
if hfunc in nodesbyhandler:
nodesbyhandler[hfunc].append(node)
else:
nodesbyhandler[hfunc] = [node]
for bn in badcollnodes:
nodesbyhandler[BadCollective(bn).error] = [bn]
numworkers = 0
for hfunc in nodesbyhandler:
numworkers += 1
tasks.spawn(addtoqueue(passvalues, hfunc, {'nodes': nodesbyhandler[hfunc],
'element': pathcomponents,
'configmanager': configmanager,
'inputdata': _get_input_data(_plugin, pathcomponents,
operation, inputdata,nodes,
isnoderange, configmanager)}))
for manager in nodesbymanager:
numworkers += 1
tasks.spawn(addtoqueue(passvalues, dispatch_request, {
'nodes': nodesbymanager[manager], 'manager': manager,
'element': pathcomponents, 'configmanager': configmanager,
'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange}))
if isnoderange or not autostrip:
return iterate_queue(numworkers, passvalues) # [x async for x in iterate_queue(numworkers, passvalues)]
else:
if numworkers > 0:
return iterate_queue(numworkers, passvalues, nodes[0]) # [x async for x in iterate_queue(numworkers, passvalues, nodes[0])]
else:
raise exc.NotImplementedException()
# elif isinstance(passvalues[0], console.Console):
# return passvalues[0]
# else:
# return stripnode(passvalues[0], nodes[0])
def _get_input_data(plugin_ext, pathcomponents, operation, inputdata,
nodes, isnoderange, configmanager):
if plugin_ext is not None and hasattr(plugin_ext, 'get_input_message'):
return plugin_ext.get_input_message(pathcomponents, operation,
inputdata, nodes, isnoderange,
configmanager)
else:
return msg.get_input_message(pathcomponents, operation, inputdata,
nodes, isnoderange,configmanager)
async def iterate_queue(numworkers, passvalues, strip=False):
completions = 0
while completions < numworkers:
nv = await passvalues.get()
if nv == 'theend':
completions += 1
else:
if isinstance(nv, Exception):
raise nv
if strip and not isinstance(nv, console.Console):
nv.strip_node(strip)
yield nv
async def addtoqueue(theq, fun, kwargs):
try:
result = fun(**kwargs)
if isinstance(result, console.Console):
await theq.put(result)
else:
async for pv in iterate_responses(result):
await theq.put(pv)
except Exception as e:
await theq.put(e)
finally:
await theq.put('theend')
async def dispatch_request(nodes, manager, element, configmanager, inputdata,
operation, isnoderange):
a = configmanager.get_collective_member(manager)
try:
remote = await collective.connect_to_collective(a['fingerprint'], a['address'])
except Exception as e:
raise
for node in nodes:
if a:
yield msg.ConfluentResourceUnavailable(
node, 'Collective member {0} is unreachable ({1})'.format(
a['name'], str(e)))
else:
yield msg.ConfluentResourceUnavailable(
node,
'"{0}" is not recognized as a collective member'.format(
manager))
return
banner = await tlvdata.recv(remote)
vers = banner.split()[2]
if vers == b'v0':
pvers = 2
elif vers == b'v1':
pvers = 4
if sys.version_info[0] < 3:
pvers = 2
await tlvdata.recv(remote)
myname = collective.get_myname()
dreq = b'\x01\x03' + msgpack.packb(
{'name': myname, 'nodes': list(nodes),
'path': element,'tenant': configmanager.tenant,
'operation': operation, 'inputdata': inputdata, 'isnoderange': isnoderange}, use_bin_type=False)
await tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}})
remote[1].write(dreq)
await remote[1].drain()
while True:
try:
rlen = await remote[0].read(8)
except Exception:
for node in nodes:
yield msg.ConfluentResourceUnavailable(
node, 'Collective member {0} went unreachable'.format(
a['name']))
return
while len(rlen) < 8:
try:
nlen = await remote[0].read(8 - len(rlen))
except Exception:
nlen = 0
if not nlen:
for node in nodes:
yield msg.ConfluentResourceUnavailable(
node, 'Collective member {0} went unreachable'.format(
a['name']))
return
rlen += nlen
rlen = struct.unpack('!Q', rlen)[0]
if rlen == 0:
break
try:
rsp = await remote[0].read(rlen)
except Exception:
for node in nodes:
yield msg.ConfluentResourceUnavailable(
node, 'Collective member {0} went unreachable'.format(
a['name']))
return
while len(rsp) < rlen:
try:
nrsp = await remote[0].read(rlen - len(rsp))
except Exception:
nrsp = 0
if not nrsp:
for node in nodes:
yield msg.ConfluentResourceUnavailable(
node, 'Collective member {0} went unreachable'.format(
a['name']))
return
rsp += nrsp
if rsp == b'\x00':
continue
try:
rsp = msg.msg_deserialize(rsp)
except Exception:
rsp = exc.deserialize_exc(rsp)
if isinstance(rsp, Exception):
raise rsp
if not rsp:
raise Exception('Error in cross-collective serialize/deserialze, see remote logs')
yield rsp
def handle_discovery(pathcomponents, operation, configmanager, inputdata):
if pathcomponents[0] == 'detected':
pass
class Staging:
def __init__(self, user, uuid):
self.uuid_str = uuid
self.storage_folder = '/var/lib/confluent/client_assets/' + self.uuid_str
self.filename = None
self.user = user
self.base_folder = os.path.exists('/var/lib/confluent/client_assets/')
if not self.base_folder:
try:
os.mkdir('/var/lib/confluent/client_assets/')
except Exception as e:
raise OSError(str(e))
def getUUID(self):
return self.uuid_str
def get_push_url(self):
return 'staging/{0}/{1}'.format(self.user,self.uuid_str)
def create_directory(self):
try:
os.mkdir(self.storage_folder)
return True
except OSError as e:
raise exc.InvalidArgumentException(str(e))
def get_file_name(self):
stage_file = '{}/filename.txt'.format(self.storage_folder)
try:
with open(stage_file, 'r') as f:
filename = f.readline()
os.remove(stage_file)
return self.storage_folder + '/{}'.format(filename)
except FileNotFoundError:
file = None
return False
@staticmethod
def remove_directory(directory):
storage_folder = '/var/lib/confluent/client_assets/' + directory
if os.path.exists(storage_folder):
shutil.rmtree(storage_folder)
else:
raise FileNotFoundError
return directory
async def handle_staging(pathcomponents, operation, configmanager, inputdata):
'''
e.g push_url: /confluent-api/staging/user/<unique_id>
'''
if operation == 'create':
if len(pathcomponents) == 1:
stage = Staging(inputdata['user'],str(uuid.uuid1()))
if stage.create_directory():
if 'filename' in inputdata:
data_file = stage.storage_folder + '/filename.txt'
with open(data_file, 'w') as f:
f.write(inputdata['filename'])
else:
raise Exception('Error: Missing filename arg')
push_url = stage.get_push_url()
yield msg.CreatedResource(push_url)
elif len(pathcomponents) == 3:
stage = Staging(pathcomponents[1], pathcomponents[2])
file = stage.get_file_name()
if 'filedata' in inputdata and file:
content_length = inputdata['content_length']
remaining_length = content_length
filedata = inputdata['filedata']
chunk_size = 16384
progress = 0.0
with open(file, 'wb') as f:
while remaining_length > 0:
progress = (1 - (remaining_length/content_length)) * 100
#TODO: ASYNC Need to change to aiohttp approach
datachunk = filedata['wsgi.input'].read(min(chunk_size, remaining_length))
f.write(datachunk)
remaining_length -= len(datachunk)
await asyncio.sleep(0)
yield msg.FileUploadProgress(progress)
yield msg.FileUploadProgress(100)
elif operation == 'delete':
if len(pathcomponents) == 3:
asset = Staging.remove_directory(pathcomponents[2])
yield msg.DeletedResource(asset)
else:
raise Exception("Invalid url")
async def handle_path(path, operation, configmanager, inputdata=None, autostrip=True):
"""Given a full path request, return an object.
The plugins should generally return some sort of iterator.
An exception is made for console/session, which should return
a class with connect(), read(), write(bytes), and close()
"""
pathcomponents = path.split('/')
del pathcomponents[0] # discard the value from leading /
if pathcomponents[-1] == '':
del pathcomponents[-1]
if not pathcomponents: # root collection list
return enumerate_collections(rootcollections)
elif pathcomponents[0] == 'noderange':
return await handle_node_request(configmanager, inputdata, operation,
pathcomponents, autostrip)
elif pathcomponents[0] == 'deployment':
return await handle_deployment(configmanager, inputdata, pathcomponents,
operation)
elif pathcomponents[0] == 'storage':
return await handle_storage(configmanager, inputdata, pathcomponents,
operation)
elif pathcomponents[0] == 'nodegroups':
return await handle_nodegroup_request(configmanager, inputdata,
pathcomponents,
operation)
elif pathcomponents[0] == 'nodes':
# single node request of some sort
return await handle_node_request(configmanager, inputdata,
operation, pathcomponents, autostrip)
elif pathcomponents[0] == 'discovery':
return await disco.handle_api_request(
configmanager, inputdata, operation, pathcomponents)
elif pathcomponents[0] == 'networking':
return await macmap.handle_api_request(
configmanager, inputdata, operation, pathcomponents)
elif pathcomponents[0] == 'version':
return (msg.Attributes(kv={'version': confluent.__version__}),)
elif pathcomponents[0] == 'uuid':
if operation == 'update':
with open('/var/lib/confluent/public/site/confluent_uuid', 'r') as uuidf:
fsuuid = uuidf.read().strip()
cfm.set_global('confluent_uuid', fsuuid)
return (msg.Attributes(kv={'uuid': cfm.get_global('confluent_uuid')}),)
elif pathcomponents[0] == 'usergroups':
# TODO: when non-administrator accounts exist,
# they must only be allowed to see their own user
try:
usergroup = pathcomponents[1]
except IndexError: # it's just users/
if operation == 'create':
inputdata = msg.get_input_message(
pathcomponents, operation, inputdata,
configmanager=configmanager)
await create_usergroup(inputdata.attribs, configmanager)
return iterate_collections(configmanager.list_usergroups(),
forcecollection=False)
if usergroup not in configmanager.list_usergroups():
raise exc.NotFoundException("Invalid usergroup %s" % usergroup)
if operation == 'retrieve':
return show_usergroup(usergroup, configmanager)
elif operation == 'delete':
return delete_usergroup(usergroup, configmanager)
elif operation == 'update':
inputdata = msg.get_input_message(
pathcomponents, operation, inputdata,
configmanager=configmanager)
await update_usergroup(usergroup, inputdata.attribs, configmanager)
return show_usergroup(usergroup, configmanager)
elif pathcomponents[0] == 'users':
# TODO: when non-administrator accounts exist,
# they must only be allowed to see their own user
try:
user = pathcomponents[1]
except IndexError: # it's just users/
if operation == 'create':
inputdata = msg.get_input_message(
pathcomponents, operation, inputdata,
configmanager=configmanager)
await create_user(inputdata.attribs, configmanager)
return iterate_collections(configmanager.list_users(),
forcecollection=False)
if user not in configmanager.list_users():
raise exc.NotFoundException("Invalid user %s" % user)
if operation == 'retrieve':
return show_user(user, configmanager)
elif operation == 'delete':
return delete_user(user, configmanager)
elif operation == 'update':
inputdata = msg.get_input_message(
pathcomponents, operation, inputdata,
configmanager=configmanager)
await update_user(user, inputdata.attribs, configmanager)
return show_user(user, configmanager)
elif pathcomponents[0] == 'events':
try:
element = pathcomponents[1]
except IndexError:
if operation != 'retrieve':
raise exc.InvalidArgumentException('Target is read-only')
return (msg.ChildCollection('decode'),)
if element != 'decode':
raise exc.NotFoundException()
if operation == 'update':
return await alerts.decode_alert(inputdata, configmanager)
elif pathcomponents[0] == 'discovery':
return handle_discovery(pathcomponents[1:], operation, configmanager,
inputdata)
elif pathcomponents[0] == 'staging':
return await handle_staging(pathcomponents, operation, configmanager, inputdata)
else:
raise exc.NotFoundException()