mirror of
https://github.com/xcat2/confluent.git
synced 2026-05-09 10:10:10 +00:00
bfc27595dc
If someone asks for it independently, we can break it out again. But for now, assume it's only for confluent.
442 lines
16 KiB
Python
442 lines
16 KiB
Python
# Copyright 2015-2019 Lenovo
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# This provides ability to do HTTPS in a manner like ssh host keys for the
|
|
# sake of typical internal management devices. Compatibility back to python
|
|
# 2.6 as is found in commonly used enterprise linux distributions.
|
|
|
|
import asyncio
|
|
import base64
|
|
import copy
|
|
import gzip
|
|
import io
|
|
import json
|
|
import os
|
|
import socket
|
|
import ssl
|
|
import threading
|
|
import traceback
|
|
|
|
from yarl import URL
|
|
import aiohmi.exceptions as pygexc
|
|
|
|
|
|
import aiohttp
|
|
from aiohttp.cookiejar import CookieJar
|
|
|
|
import http.client as httplib
|
|
import http.cookies as Cookie
|
|
|
|
# Used as the separator for form data
|
|
|
|
# We will frequently be dealing with the same data across many instances,
|
|
# consolidate forms to single memory location to get benefits..
|
|
uploadforms = {}
|
|
|
|
class CustomVerifier(aiohttp.Fingerprint):
|
|
def __init__(self, verifycallback):
|
|
self._certverify = verifycallback
|
|
|
|
def check(self, transport):
|
|
sslobj = transport.get_extra_info("ssl_object")
|
|
cert = sslobj.getpeercert(binary_form=True)
|
|
try:
|
|
if not self._certverify(cert):
|
|
raise pygexc.UnrecognizedCertificate('Unknown certificate',
|
|
cert)
|
|
except Exception:
|
|
transport.close()
|
|
raise
|
|
|
|
class Downloader:
|
|
@classmethod
|
|
def create(cls, filehandle):
|
|
self = cls()
|
|
self.contentlen = None
|
|
self._completed = False
|
|
self._filehandle = filehandle
|
|
self._xfertask = None
|
|
self.exc = None
|
|
return self
|
|
|
|
async def get_progress(self):
|
|
if self.contentlen is None:
|
|
return -0.5
|
|
offset = None
|
|
retries = 100
|
|
while offset is None:
|
|
if self._completed:
|
|
return 1.0
|
|
try:
|
|
offset = self._filehandle.tell()
|
|
except ValueError:
|
|
retries -= 1
|
|
if retries <= 0:
|
|
return -0.5
|
|
await asyncio.sleep(0.01)
|
|
return float(offset) / float(self.contentlen)
|
|
|
|
def mark_completed(self, fut):
|
|
self._completed = True
|
|
|
|
def set_task(self, task):
|
|
self._xfertask = task
|
|
|
|
def completed(self):
|
|
return self._completed
|
|
|
|
async def join(self, timeout=None):
|
|
if self._xfertask is None:
|
|
return
|
|
if timeout is None:
|
|
await self._xfertask
|
|
else:
|
|
await asyncio.wait_for(asyncio.shield(self._xfertask), timeout=timeout)
|
|
|
|
class Uploader(Downloader):
|
|
@classmethod
|
|
async def create(cls, filename, data=None, formname=None,
|
|
otherfields=(), formwrap=True):
|
|
self = cls()
|
|
self._response = None
|
|
self._statuscode = None
|
|
self._xfertask = None
|
|
self._completed = False
|
|
self._rspheaders = None
|
|
self.rsp = ''
|
|
self.rspstatus = 500
|
|
self.filename = filename
|
|
if data:
|
|
self.data = data
|
|
else:
|
|
self.data = open(filename, 'rb')
|
|
self.formname = formname
|
|
self.otherfields = otherfields
|
|
self.ulheaders = {}
|
|
if formwrap:
|
|
guf = await get_upload_form(
|
|
filename, self.data, formname, otherfields)
|
|
self._upbuffer = io.BytesIO(guf[0])
|
|
self._boundary = guf[1]
|
|
self.ulsize = len(uploadforms[filename][0])
|
|
self.ulheaders['Content-Type'] = 'multipart/form-data; boundary={0}'.format(
|
|
self._boundary.decode('utf-8'))
|
|
self.ulheaders['Content-Size'] = str(self.ulsize)
|
|
else:
|
|
canseek = True
|
|
try:
|
|
curroff = self.data.tell()
|
|
except Exception:
|
|
canseek = False
|
|
databytes = await asyncio.to_thread(self.data.read)
|
|
self.ulsize = len(databytes)
|
|
self._upbuffer = io.BytesIO(databytes)
|
|
if canseek:
|
|
self.data.seek(0, 2)
|
|
self.ulsize = self.data.tell() - curroff
|
|
self.data.seek(curroff, 0)
|
|
self._upbuffer = self.data
|
|
self.ulheaders['Content-Length'] = str(self.ulsize)
|
|
self.ulheaders['Content-Type'] = 'application/octet-stream'
|
|
return self
|
|
|
|
def set_response(self, statuscode, response, headers):
|
|
self._statuscode = statuscode
|
|
self._response = response
|
|
self._rspheaders = headers
|
|
|
|
def get_response(self):
|
|
return self._statuscode, self._response, self._rspheaders
|
|
|
|
def get_buffer(self):
|
|
return self._upbuffer
|
|
|
|
def get_headers(self):
|
|
return self.ulheaders
|
|
|
|
def get_size(self):
|
|
return self.ulsize
|
|
|
|
def close(self):
|
|
if self.filename in uploadforms:
|
|
try:
|
|
del uploadforms[self.filename]
|
|
except KeyError:
|
|
pass
|
|
try:
|
|
self.data.close()
|
|
except Exception:
|
|
pass
|
|
|
|
async def get_progress(self):
|
|
if self._completed:
|
|
return 1.0
|
|
if self._xfertask is None:
|
|
return 0.0
|
|
|
|
totalen = self.get_size()
|
|
if totalen is None:
|
|
return -0.5
|
|
offset = None
|
|
tries = 100
|
|
while offset is None:
|
|
if self._completed:
|
|
return 1.0
|
|
if self._xfertask is None:
|
|
return 0.0
|
|
try:
|
|
offset = self._upbuffer.tell()
|
|
except ValueError:
|
|
await asyncio.sleep(0.01)
|
|
tries -= 1
|
|
if tries <= 0:
|
|
return -0.5
|
|
return float(offset) / float(totalen)
|
|
|
|
def make_downloader(webconn, url, dlfile):
|
|
if isinstance(dlfile, str):
|
|
dlfile = open(dlfile, 'wb')
|
|
dler = Downloader.create(dlfile)
|
|
tsk = asyncio.create_task(webconn.download(url, dlfile, dler))
|
|
dler.set_task(tsk)
|
|
tsk.add_done_callback(dler.mark_completed)
|
|
return dler
|
|
|
|
async def make_uploader(webconn, url, filename, data=None, formname=None,
|
|
otherfields=(), formwrap=True):
|
|
uler = await Uploader.create(filename, data, formname, otherfields, formwrap)
|
|
tsk = asyncio.create_task(webconn.upload(
|
|
url, filename, uler.get_buffer(), uploader=uler))
|
|
uler.set_task(tsk)
|
|
tsk.add_done_callback(uler.mark_completed)
|
|
return uler
|
|
|
|
|
|
|
|
|
|
async def get_upload_form(filename, data, formname, otherfields, boundary=None):
|
|
if not boundary:
|
|
boundary = base64.urlsafe_b64encode(os.urandom(54))[:66]
|
|
ffilename = filename.split('/')[-1]
|
|
if not formname:
|
|
formname = ffilename
|
|
while uploadforms.get(filename, None) == 'pending':
|
|
await asyncio.sleep(0.1)
|
|
try:
|
|
return uploadforms[filename]
|
|
except KeyError:
|
|
uploadforms[filename] = 'pending'
|
|
try:
|
|
data = await asyncio.to_thread(data.read)
|
|
except AttributeError:
|
|
pass
|
|
return await asyncio.to_thread(assign_upload_form, filename, ffilename, data, formname, otherfields, boundary)
|
|
|
|
def assign_upload_form(filename, ffilename, data, formname, otherfields, boundary=None):
|
|
form = b''
|
|
for ofield in otherfields:
|
|
tfield = otherfields[ofield]
|
|
xtra=''
|
|
if isinstance(tfield, dict):
|
|
tfield = json.dumps(tfield)
|
|
xtra = '\r\nContent-Type: application/json'
|
|
form += (b'--' + boundary
|
|
+ '\r\nContent-Disposition: form-data; '
|
|
'name="{0}"{1}\r\n\r\n{2}\r\n'.format(
|
|
ofield, xtra, tfield).encode('utf-8'))
|
|
form += (b'--' + boundary
|
|
+ '\r\nContent-Disposition: form-data; '
|
|
'name="{0}"; filename="{1}"\r\n'.format(
|
|
formname, ffilename).encode('utf-8'))
|
|
form += b'Content-Type: application/octet-stream\r\n\r\n' + data
|
|
form += b'\r\n--' + boundary + b'--\r\n'
|
|
uploadforms[filename] = form, boundary
|
|
return uploadforms[filename]
|
|
|
|
|
|
class WebConnection:
|
|
def __init__(self, host, port, verifycallback=None, timeout=None):
|
|
self.port = port
|
|
self.thehost = host
|
|
if ':' in host and '[' not in host:
|
|
self.host = f'[{host}]'
|
|
else:
|
|
self.host = host
|
|
if verifycallback:
|
|
self.ssl = CustomVerifier(verifycallback)
|
|
else:
|
|
self.ssl = None
|
|
self.verifycallback = verifycallback
|
|
if isinstance(timeout, (int, float)):
|
|
self.timeout = aiohttp.ClientTimeout(total=timeout)
|
|
else:
|
|
self.timeout = timeout
|
|
self.stdheaders = {}
|
|
if '[' not in host and '%' in host:
|
|
self.stdheaders['Host'] = '[' + host.split('%', 1)[0] + ']'
|
|
self.cookies = CookieJar(quote_cookie=False, unsafe=True)
|
|
|
|
def set_timeout(self, timeout):
|
|
if isinstance(timeout, (int, float)):
|
|
self.timeout = aiohttp.ClientTimeout(total=timeout)
|
|
else:
|
|
self.timeout = timeout
|
|
|
|
def get_timeout(self):
|
|
return self.timeout
|
|
|
|
def set_header(self, key, value):
|
|
self.stdheaders[key] = value
|
|
|
|
def dupe(self, timeout=None):
|
|
newwc = WebConnection(self.host, self.port,
|
|
verifycallback=self.verifycallback, timeout=timeout or self.timeout)
|
|
newwc.stdheaders = self.stdheaders.copy()
|
|
newwc.cookies = CookieJar(quote_cookie=False, unsafe=True)
|
|
for cookie in self.cookies:
|
|
newwc.cookies.update_cookies(
|
|
{cookie.key: cookie.value}, response_url=URL(f'https://{self.host}:{self.port}/'))
|
|
return newwc
|
|
|
|
async def request(
|
|
self, method, url, body=None, headers=None, referer=None):
|
|
if headers is None:
|
|
headers = self.stdheaders.copy()
|
|
else:
|
|
headers = headers.copy()
|
|
if method == 'GET' and 'Content-Type' in headers:
|
|
del headers['Content-Type']
|
|
if method == 'POST' and body and 'Content-Type' not in headers:
|
|
headers['Content-Type'] = 'application/x-www-form-urlencoded'
|
|
if body and 'Content-Length' not in headers:
|
|
headers['Content-Length'] = len(body)
|
|
if referer:
|
|
headers['referer'] = referer
|
|
method = method.lower()
|
|
async with aiohttp.ClientSession(
|
|
f'https://{self.host}:{self.port}', cookie_jar=self.cookies, timeout=self.timeout) as session:
|
|
thefunc = getattr(session, method)
|
|
kwargs = {}
|
|
if isinstance(body, dict):
|
|
kwargs['json'] = body
|
|
elif body:
|
|
kwargs['data'] = body
|
|
async with thefunc(url, headers=headers, ssl=self.ssl, **kwargs) as rsp:
|
|
pass
|
|
|
|
def set_basic_credentials(self, username, password):
|
|
if isinstance(username, bytes) and not isinstance(username, str):
|
|
username = username.decode('utf-8')
|
|
if isinstance(password, bytes) and not isinstance(password, str):
|
|
password = password.decode('utf-8')
|
|
authinfo = ':'.join((username, password))
|
|
if not isinstance(authinfo, bytes):
|
|
authinfo = authinfo.encode('utf-8')
|
|
authinfo = base64.b64encode(authinfo)
|
|
if not isinstance(authinfo, str):
|
|
authinfo = authinfo.decode('utf-8')
|
|
self.stdheaders['Authorization'] = 'Basic {0}'.format(authinfo)
|
|
|
|
async def grab_json_response(self, url, data=None, referer=None, headers=None):
|
|
self.lastjsonerror = None
|
|
body, status = await self.grab_json_response_with_status(
|
|
url, data, referer, headers)
|
|
if status == 200:
|
|
return body
|
|
self.lastjsonerror = body
|
|
return {}
|
|
|
|
async def grab_json_response_with_status(self, url, data=None, referer=None,
|
|
headers=None, method=None):
|
|
rsp, status, hdrs = await self.grab_response_with_status(url, data, referer, headers, method, expect_type='json')
|
|
return rsp, status
|
|
|
|
async def grab_response_with_status(self, url, data=None, referer=None,
|
|
headers=None, method=None, expect_type=None):
|
|
if not headers:
|
|
headers = self.stdheaders.copy()
|
|
else:
|
|
headers = headers.copy()
|
|
if referer:
|
|
headers['referer'] = referer
|
|
if not method:
|
|
method = 'POST' if data is not None else 'GET'
|
|
method = method.lower()
|
|
if 'Content-Type' in headers and method.lower() in ('get', 'delete'):
|
|
del headers['Content-Type']
|
|
async with aiohttp.ClientSession(f'https://{self.host}:{self.port}', cookie_jar=self.cookies, timeout=self.timeout) as session:
|
|
thefunc = getattr(session, method)
|
|
kwargs = {}
|
|
if isinstance(data, dict):
|
|
kwargs['json'] = data
|
|
if 'Content-Type' not in headers:
|
|
headers['Content-Type'] = 'application/json'
|
|
elif data is not None:
|
|
kwargs['data'] = data
|
|
async with thefunc(url, headers=headers, ssl=self.ssl, **kwargs) as rsp:
|
|
if rsp.status >= 200 and rsp.status < 300:
|
|
if expect_type == 'json':
|
|
return await rsp.json(content_type=''), rsp.status, rsp.headers
|
|
elif expect_type == 'text':
|
|
return await rsp.text(), rsp.status, rsp.headers
|
|
else:
|
|
return await rsp.read(), rsp.status, rsp.headers
|
|
else:
|
|
return await rsp.read(), rsp.status, rsp.headers
|
|
|
|
async def download(self, url, dlfile, downloader=None):
|
|
"""Download a file to filename or file object
|
|
|
|
"""
|
|
if isinstance(dlfile, str):
|
|
dlfile = open(dlfile, 'wb')
|
|
dlheaders = self.stdheaders.copy()
|
|
if 'Accept-Encoding' in dlheaders:
|
|
del dlheaders['Accept-Encoding']
|
|
async with aiohttp.ClientSession(f'https://{self.host}:{self.port}', cookie_jar=self.cookies, timeout=self.timeout) as session:
|
|
async with session.get(url, headers=dlheaders, ssl=self.ssl) as rsp:
|
|
if downloader:
|
|
downloader.contentlen = rsp.headers.get('content-length', None)
|
|
try:
|
|
downloader.contentlen = int(downloader.contentlen)
|
|
except Exception:
|
|
downloader.contentlen = None
|
|
async for chunk in rsp.content.iter_chunked(16384):
|
|
dlfile.write(chunk)
|
|
dlfile.close()
|
|
|
|
async def upload(self, url, ulfile, data=None, uploader=None):
|
|
upheaders = self.stdheaders.copy()
|
|
if uploader:
|
|
upheaders.update(uploader.get_headers())
|
|
data = uploader.get_buffer()
|
|
else:
|
|
raise Exception("Not implemented without uploader handler")
|
|
async with aiohttp.ClientSession(f'https://{self.host}:{self.port}', cookie_jar=self.cookies, timeout=self.timeout) as session:
|
|
async with session.post(url, headers=upheaders, ssl=self.ssl, data=data) as rsp:
|
|
if rsp.status >= 200 and rsp.status < 300:
|
|
expect_type = rsp.headers.get('Content-Type', '')
|
|
if 'json' in expect_type:
|
|
uploader.set_response(rsp.status, await rsp.json(content_type=''), rsp.headers)
|
|
elif 'text' in expect_type:
|
|
uploader.set_response(rsp.status, await rsp.text(), rsp.headers)
|
|
else:
|
|
uploader.set_response(rsp.status, await rsp.read(), rsp.headers)
|
|
else:
|
|
uploader.set_response(rsp.status, await rsp.read(), rsp.headers)
|
|
if uploader:
|
|
uploader.close()
|
|
return uploader._statuscode
|
|
|