2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-05-01 04:47:45 +00:00
Files
confluent/confluent_server/aiohmi/util/webclient.py
Jarrod Johnson bfc27595dc Fold aiohmi into confluent
If someone asks for it independently, we can break it out again.  But for now,
assume it's only for confluent.
2026-04-30 08:48:24 -04:00

442 lines
16 KiB
Python

# Copyright 2015-2019 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This provides ability to do HTTPS in a manner like ssh host keys for the
# sake of typical internal management devices. Compatibility back to python
# 2.6 as is found in commonly used enterprise linux distributions.
import asyncio
import base64
import copy
import gzip
import io
import json
import os
import socket
import ssl
import threading
import traceback
from yarl import URL
import aiohmi.exceptions as pygexc
import aiohttp
from aiohttp.cookiejar import CookieJar
import http.client as httplib
import http.cookies as Cookie
# Used as the separator for form data
# We will frequently be dealing with the same data across many instances,
# consolidate forms to single memory location to get benefits..
uploadforms = {}
class CustomVerifier(aiohttp.Fingerprint):
def __init__(self, verifycallback):
self._certverify = verifycallback
def check(self, transport):
sslobj = transport.get_extra_info("ssl_object")
cert = sslobj.getpeercert(binary_form=True)
try:
if not self._certverify(cert):
raise pygexc.UnrecognizedCertificate('Unknown certificate',
cert)
except Exception:
transport.close()
raise
class Downloader:
@classmethod
def create(cls, filehandle):
self = cls()
self.contentlen = None
self._completed = False
self._filehandle = filehandle
self._xfertask = None
self.exc = None
return self
async def get_progress(self):
if self.contentlen is None:
return -0.5
offset = None
retries = 100
while offset is None:
if self._completed:
return 1.0
try:
offset = self._filehandle.tell()
except ValueError:
retries -= 1
if retries <= 0:
return -0.5
await asyncio.sleep(0.01)
return float(offset) / float(self.contentlen)
def mark_completed(self, fut):
self._completed = True
def set_task(self, task):
self._xfertask = task
def completed(self):
return self._completed
async def join(self, timeout=None):
if self._xfertask is None:
return
if timeout is None:
await self._xfertask
else:
await asyncio.wait_for(asyncio.shield(self._xfertask), timeout=timeout)
class Uploader(Downloader):
@classmethod
async def create(cls, filename, data=None, formname=None,
otherfields=(), formwrap=True):
self = cls()
self._response = None
self._statuscode = None
self._xfertask = None
self._completed = False
self._rspheaders = None
self.rsp = ''
self.rspstatus = 500
self.filename = filename
if data:
self.data = data
else:
self.data = open(filename, 'rb')
self.formname = formname
self.otherfields = otherfields
self.ulheaders = {}
if formwrap:
guf = await get_upload_form(
filename, self.data, formname, otherfields)
self._upbuffer = io.BytesIO(guf[0])
self._boundary = guf[1]
self.ulsize = len(uploadforms[filename][0])
self.ulheaders['Content-Type'] = 'multipart/form-data; boundary={0}'.format(
self._boundary.decode('utf-8'))
self.ulheaders['Content-Size'] = str(self.ulsize)
else:
canseek = True
try:
curroff = self.data.tell()
except Exception:
canseek = False
databytes = await asyncio.to_thread(self.data.read)
self.ulsize = len(databytes)
self._upbuffer = io.BytesIO(databytes)
if canseek:
self.data.seek(0, 2)
self.ulsize = self.data.tell() - curroff
self.data.seek(curroff, 0)
self._upbuffer = self.data
self.ulheaders['Content-Length'] = str(self.ulsize)
self.ulheaders['Content-Type'] = 'application/octet-stream'
return self
def set_response(self, statuscode, response, headers):
self._statuscode = statuscode
self._response = response
self._rspheaders = headers
def get_response(self):
return self._statuscode, self._response, self._rspheaders
def get_buffer(self):
return self._upbuffer
def get_headers(self):
return self.ulheaders
def get_size(self):
return self.ulsize
def close(self):
if self.filename in uploadforms:
try:
del uploadforms[self.filename]
except KeyError:
pass
try:
self.data.close()
except Exception:
pass
async def get_progress(self):
if self._completed:
return 1.0
if self._xfertask is None:
return 0.0
totalen = self.get_size()
if totalen is None:
return -0.5
offset = None
tries = 100
while offset is None:
if self._completed:
return 1.0
if self._xfertask is None:
return 0.0
try:
offset = self._upbuffer.tell()
except ValueError:
await asyncio.sleep(0.01)
tries -= 1
if tries <= 0:
return -0.5
return float(offset) / float(totalen)
def make_downloader(webconn, url, dlfile):
if isinstance(dlfile, str):
dlfile = open(dlfile, 'wb')
dler = Downloader.create(dlfile)
tsk = asyncio.create_task(webconn.download(url, dlfile, dler))
dler.set_task(tsk)
tsk.add_done_callback(dler.mark_completed)
return dler
async def make_uploader(webconn, url, filename, data=None, formname=None,
otherfields=(), formwrap=True):
uler = await Uploader.create(filename, data, formname, otherfields, formwrap)
tsk = asyncio.create_task(webconn.upload(
url, filename, uler.get_buffer(), uploader=uler))
uler.set_task(tsk)
tsk.add_done_callback(uler.mark_completed)
return uler
async def get_upload_form(filename, data, formname, otherfields, boundary=None):
if not boundary:
boundary = base64.urlsafe_b64encode(os.urandom(54))[:66]
ffilename = filename.split('/')[-1]
if not formname:
formname = ffilename
while uploadforms.get(filename, None) == 'pending':
await asyncio.sleep(0.1)
try:
return uploadforms[filename]
except KeyError:
uploadforms[filename] = 'pending'
try:
data = await asyncio.to_thread(data.read)
except AttributeError:
pass
return await asyncio.to_thread(assign_upload_form, filename, ffilename, data, formname, otherfields, boundary)
def assign_upload_form(filename, ffilename, data, formname, otherfields, boundary=None):
form = b''
for ofield in otherfields:
tfield = otherfields[ofield]
xtra=''
if isinstance(tfield, dict):
tfield = json.dumps(tfield)
xtra = '\r\nContent-Type: application/json'
form += (b'--' + boundary
+ '\r\nContent-Disposition: form-data; '
'name="{0}"{1}\r\n\r\n{2}\r\n'.format(
ofield, xtra, tfield).encode('utf-8'))
form += (b'--' + boundary
+ '\r\nContent-Disposition: form-data; '
'name="{0}"; filename="{1}"\r\n'.format(
formname, ffilename).encode('utf-8'))
form += b'Content-Type: application/octet-stream\r\n\r\n' + data
form += b'\r\n--' + boundary + b'--\r\n'
uploadforms[filename] = form, boundary
return uploadforms[filename]
class WebConnection:
def __init__(self, host, port, verifycallback=None, timeout=None):
self.port = port
self.thehost = host
if ':' in host and '[' not in host:
self.host = f'[{host}]'
else:
self.host = host
if verifycallback:
self.ssl = CustomVerifier(verifycallback)
else:
self.ssl = None
self.verifycallback = verifycallback
if isinstance(timeout, (int, float)):
self.timeout = aiohttp.ClientTimeout(total=timeout)
else:
self.timeout = timeout
self.stdheaders = {}
if '[' not in host and '%' in host:
self.stdheaders['Host'] = '[' + host.split('%', 1)[0] + ']'
self.cookies = CookieJar(quote_cookie=False, unsafe=True)
def set_timeout(self, timeout):
if isinstance(timeout, (int, float)):
self.timeout = aiohttp.ClientTimeout(total=timeout)
else:
self.timeout = timeout
def get_timeout(self):
return self.timeout
def set_header(self, key, value):
self.stdheaders[key] = value
def dupe(self, timeout=None):
newwc = WebConnection(self.host, self.port,
verifycallback=self.verifycallback, timeout=timeout or self.timeout)
newwc.stdheaders = self.stdheaders.copy()
newwc.cookies = CookieJar(quote_cookie=False, unsafe=True)
for cookie in self.cookies:
newwc.cookies.update_cookies(
{cookie.key: cookie.value}, response_url=URL(f'https://{self.host}:{self.port}/'))
return newwc
async def request(
self, method, url, body=None, headers=None, referer=None):
if headers is None:
headers = self.stdheaders.copy()
else:
headers = headers.copy()
if method == 'GET' and 'Content-Type' in headers:
del headers['Content-Type']
if method == 'POST' and body and 'Content-Type' not in headers:
headers['Content-Type'] = 'application/x-www-form-urlencoded'
if body and 'Content-Length' not in headers:
headers['Content-Length'] = len(body)
if referer:
headers['referer'] = referer
method = method.lower()
async with aiohttp.ClientSession(
f'https://{self.host}:{self.port}', cookie_jar=self.cookies, timeout=self.timeout) as session:
thefunc = getattr(session, method)
kwargs = {}
if isinstance(body, dict):
kwargs['json'] = body
elif body:
kwargs['data'] = body
async with thefunc(url, headers=headers, ssl=self.ssl, **kwargs) as rsp:
pass
def set_basic_credentials(self, username, password):
if isinstance(username, bytes) and not isinstance(username, str):
username = username.decode('utf-8')
if isinstance(password, bytes) and not isinstance(password, str):
password = password.decode('utf-8')
authinfo = ':'.join((username, password))
if not isinstance(authinfo, bytes):
authinfo = authinfo.encode('utf-8')
authinfo = base64.b64encode(authinfo)
if not isinstance(authinfo, str):
authinfo = authinfo.decode('utf-8')
self.stdheaders['Authorization'] = 'Basic {0}'.format(authinfo)
async def grab_json_response(self, url, data=None, referer=None, headers=None):
self.lastjsonerror = None
body, status = await self.grab_json_response_with_status(
url, data, referer, headers)
if status == 200:
return body
self.lastjsonerror = body
return {}
async def grab_json_response_with_status(self, url, data=None, referer=None,
headers=None, method=None):
rsp, status, hdrs = await self.grab_response_with_status(url, data, referer, headers, method, expect_type='json')
return rsp, status
async def grab_response_with_status(self, url, data=None, referer=None,
headers=None, method=None, expect_type=None):
if not headers:
headers = self.stdheaders.copy()
else:
headers = headers.copy()
if referer:
headers['referer'] = referer
if not method:
method = 'POST' if data is not None else 'GET'
method = method.lower()
if 'Content-Type' in headers and method.lower() in ('get', 'delete'):
del headers['Content-Type']
async with aiohttp.ClientSession(f'https://{self.host}:{self.port}', cookie_jar=self.cookies, timeout=self.timeout) as session:
thefunc = getattr(session, method)
kwargs = {}
if isinstance(data, dict):
kwargs['json'] = data
if 'Content-Type' not in headers:
headers['Content-Type'] = 'application/json'
elif data is not None:
kwargs['data'] = data
async with thefunc(url, headers=headers, ssl=self.ssl, **kwargs) as rsp:
if rsp.status >= 200 and rsp.status < 300:
if expect_type == 'json':
return await rsp.json(content_type=''), rsp.status, rsp.headers
elif expect_type == 'text':
return await rsp.text(), rsp.status, rsp.headers
else:
return await rsp.read(), rsp.status, rsp.headers
else:
return await rsp.read(), rsp.status, rsp.headers
async def download(self, url, dlfile, downloader=None):
"""Download a file to filename or file object
"""
if isinstance(dlfile, str):
dlfile = open(dlfile, 'wb')
dlheaders = self.stdheaders.copy()
if 'Accept-Encoding' in dlheaders:
del dlheaders['Accept-Encoding']
async with aiohttp.ClientSession(f'https://{self.host}:{self.port}', cookie_jar=self.cookies, timeout=self.timeout) as session:
async with session.get(url, headers=dlheaders, ssl=self.ssl) as rsp:
if downloader:
downloader.contentlen = rsp.headers.get('content-length', None)
try:
downloader.contentlen = int(downloader.contentlen)
except Exception:
downloader.contentlen = None
async for chunk in rsp.content.iter_chunked(16384):
dlfile.write(chunk)
dlfile.close()
async def upload(self, url, ulfile, data=None, uploader=None):
upheaders = self.stdheaders.copy()
if uploader:
upheaders.update(uploader.get_headers())
data = uploader.get_buffer()
else:
raise Exception("Not implemented without uploader handler")
async with aiohttp.ClientSession(f'https://{self.host}:{self.port}', cookie_jar=self.cookies, timeout=self.timeout) as session:
async with session.post(url, headers=upheaders, ssl=self.ssl, data=data) as rsp:
if rsp.status >= 200 and rsp.status < 300:
expect_type = rsp.headers.get('Content-Type', '')
if 'json' in expect_type:
uploader.set_response(rsp.status, await rsp.json(content_type=''), rsp.headers)
elif 'text' in expect_type:
uploader.set_response(rsp.status, await rsp.text(), rsp.headers)
else:
uploader.set_response(rsp.status, await rsp.read(), rsp.headers)
else:
uploader.set_response(rsp.status, await rsp.read(), rsp.headers)
if uploader:
uploader.close()
return uploader._statuscode