mirror of
https://github.com/xcat2/confluent.git
synced 2026-02-20 06:34:26 +00:00
Address asyncio changes for consoleserver
This commit is contained in:
@@ -75,17 +75,22 @@ async def get_buffer_output(nodename):
|
||||
return bytes(outdata[:-1])
|
||||
|
||||
|
||||
def send_output(nodename, output):
|
||||
async def send_output(nodename, output):
|
||||
if not isinstance(nodename, bytes):
|
||||
nodename = nodename.encode('utf8')
|
||||
out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
out.connect("\x00confluent-vtbuffer")
|
||||
out.send(struct.pack('I', len(nodename) | (1 << 29)))
|
||||
out.send(nodename)
|
||||
rdr, writer = await asyncio.open_unix_connection(sock=out)
|
||||
hdr = struct.pack('I', len(nodename) | (1 << 29))
|
||||
writer.write(hdr)
|
||||
writer.write(nodename)
|
||||
for chunk in chunk_output(output, 8192):
|
||||
out.send(struct.pack('I', len(chunk) | (2 << 29)))
|
||||
out.send(chunk)
|
||||
writer.write(struct.pack('I', len(chunk) | (2 << 29)))
|
||||
writer.write(chunk)
|
||||
await writer.drain()
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
def _utf8_normalize(data, decoder):
|
||||
# first we give the stateful decoder a crack at the byte stream,
|
||||
@@ -182,7 +187,7 @@ class ConsoleHandler(object):
|
||||
retrytime = 120
|
||||
return retrytime + (retrytime * random.random())
|
||||
|
||||
def feedbuffer(self, data):
|
||||
async def feedbuffer(self, data):
|
||||
if not isinstance(data, bytes):
|
||||
data = data.encode('utf-8')
|
||||
if self.pendingbytes is not None:
|
||||
@@ -190,11 +195,11 @@ class ConsoleHandler(object):
|
||||
self.pendingbytes = b''
|
||||
nodeid = self.termprefix + self.node
|
||||
try:
|
||||
send_output(nodeid, data)
|
||||
await send_output(nodeid, data)
|
||||
data = self.pendingbytes
|
||||
self.pendingbytes = None
|
||||
if data:
|
||||
send_output(nodeid, data)
|
||||
await send_output(nodeid, data)
|
||||
except Exception:
|
||||
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
||||
event=log.Events.stacktrace)
|
||||
@@ -267,7 +272,7 @@ class ConsoleHandler(object):
|
||||
if onlylogging:
|
||||
return
|
||||
else:
|
||||
self._ondemand()
|
||||
await self._ondemand()
|
||||
if logvalue in ('none', 'memory'):
|
||||
self._dologging = False
|
||||
if not self._isondemand or self.livesessions:
|
||||
@@ -287,8 +292,8 @@ class ConsoleHandler(object):
|
||||
else:
|
||||
self._console.ping()
|
||||
|
||||
def clearbuffer(self):
|
||||
self.feedbuffer(
|
||||
async def clearbuffer(self):
|
||||
await self.feedbuffer(
|
||||
'\x1bc[No data has been received from the remote console since ' \
|
||||
'connecting. This could\r\nbe due to having the console.logging ' \
|
||||
'attribute set to none or interactive,\r\nserial console not ' \
|
||||
@@ -305,7 +310,7 @@ class ConsoleHandler(object):
|
||||
self.connectionthread.cancel()
|
||||
self.connectionthread = None
|
||||
# clear the terminal buffer when disconnected
|
||||
self.clearbuffer()
|
||||
await self.clearbuffer()
|
||||
if self._console:
|
||||
self.log(
|
||||
logdata='console disconnected', ltype=log.DataTypes.event,
|
||||
@@ -315,10 +320,10 @@ class ConsoleHandler(object):
|
||||
self.connectstate = 'unconnected'
|
||||
await self._send_rcpts({'connectstate': self.connectstate})
|
||||
|
||||
def _ondemand(self):
|
||||
async def _ondemand(self):
|
||||
self._isondemand = True
|
||||
if not self.livesessions and self._console:
|
||||
self._disconnect()
|
||||
await self._disconnect()
|
||||
|
||||
def _connect(self):
|
||||
if not self._is_local:
|
||||
@@ -497,7 +502,7 @@ class ConsoleHandler(object):
|
||||
event=log.Events.clientdisconnect, eventdata=edata)
|
||||
await self._send_rcpts({'clientcount': len(self.livesessions)})
|
||||
if self._isondemand and not self.livesessions:
|
||||
self._disconnect()
|
||||
await self._disconnect()
|
||||
|
||||
|
||||
async def reopen(self):
|
||||
@@ -520,12 +525,12 @@ class ConsoleHandler(object):
|
||||
if self.clearpending or self.clearerror:
|
||||
self.clearpending = False
|
||||
self.clearerror = False
|
||||
self.feedbuffer(b'\x1bc\x1b[2J\x1b[1;1H')
|
||||
await self.feedbuffer(b'\x1bc\x1b[2J\x1b[1;1H')
|
||||
await self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H')
|
||||
await self._send_rcpts(_utf8_normalize(data, self.utf8decoder))
|
||||
self.log(data, eventdata=eventdata)
|
||||
self.lasttime = util.monotonic_time()
|
||||
self.feedbuffer(data)
|
||||
await self.feedbuffer(data)
|
||||
|
||||
|
||||
async def _send_rcpts(self, data):
|
||||
|
||||
Reference in New Issue
Block a user