mirror of
https://github.com/xcat2/confluent.git
synced 2026-06-01 07:51:33 +00:00
Remove eventlet from dependencies
This commit is contained in:
@@ -18,10 +18,7 @@
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
#import eventlet
|
||||
#import eventlet.hubs
|
||||
#eventlet.hubs.use_hub("eventlet.hubs.asyncio")
|
||||
#from eventlet.asyncio import spawn_for_awaitable
|
||||
|
||||
|
||||
|
||||
path = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
@@ -38,9 +38,9 @@ if [ "$OPKGNAME" = "confluent-server" ]; then
|
||||
if grep wheezy /etc/os-release; then
|
||||
sed -i 's/^\(Depends:.*\)/\1, python-confluent-client, python-lxml, python-eficompressor, python-pycryptodomex, python-dateutil, python-pyopenssl, python-msgpack/' debian/control
|
||||
elif grep jammy /etc/os-release; then
|
||||
sed -i 's/^\(Depends:.*\)/\1, confluent-client, python3-lxml, python3-eficompressor, python3-pycryptodome, python3-websocket, python3-msgpack, python3-eventlet, python3-pyparsing, python3-pyghmi(>=1.5.71), python3-paramiko, python3-pysnmp4, python3-libarchive-c, confluent-vtbufferd, python3-netifaces, python3-yaml, python3-dateutil/' debian/control
|
||||
sed -i 's/^\(Depends:.*\)/\1, confluent-client, python3-lxml, python3-eficompressor, python3-pycryptodome, python3-websocket, python3-msgpack, python3-aiohttp, python3-pyparsing, python3-pyghmi(>=1.5.71), python3-paramiko, python3-pysnmp4, python3-libarchive-c, confluent-vtbufferd, python3-netifaces, python3-yaml, python3-dateutil/' debian/control
|
||||
else
|
||||
sed -i 's/^\(Depends:.*\)/\1, confluent-client, python3-lxml, python3-eficompressor, python3-pycryptodome, python3-websocket, python3-msgpack, python3-eventlet, python3-pyparsing, python3-pyghmi(>=1.5.71), python3-paramiko, python3-pysnmp4, python3-libarchive-c, confluent-vtbufferd, python3-netifaces, python3-yaml, python3-dateutil, python3-pyasyncore/' debian/control
|
||||
sed -i 's/^\(Depends:.*\)/\1, confluent-client, python3-lxml, python3-eficompressor, python3-pycryptodome, python3-websocket, python3-msgpack, python3-aiohttp, python3-pyparsing, python3-pyghmi(>=1.5.71), python3-paramiko, python3-pysnmp4, python3-libarchive-c, confluent-vtbufferd, python3-netifaces, python3-yaml, python3-dateutil, python3-pyasyncore/' debian/control
|
||||
fi
|
||||
if grep wheezy /etc/os-release; then
|
||||
echo 'confluent_client python-confluent-client' >> debian/pydist-overrides
|
||||
|
||||
@@ -34,7 +34,7 @@ import confluent.exceptions as exc
|
||||
import confluent.lookuptools as lookuptools
|
||||
import confluent.core
|
||||
|
||||
def decode_alert(varbinds, configmanager):
|
||||
async def decode_alert(varbinds, configmanager):
|
||||
"""Decode an SNMP alert for a server
|
||||
|
||||
Given the agentaddr, OID for the trap, and a dict of varbinds,
|
||||
@@ -49,11 +49,11 @@ def decode_alert(varbinds, configmanager):
|
||||
agentaddr = varbinds['.1.3.6.1.6.3.18.1.3.0']
|
||||
except KeyError:
|
||||
agentaddr = varbinds['1.3.6.1.6.3.18.1.3.0']
|
||||
node = lookuptools.node_by_manager(agentaddr)
|
||||
node = await lookuptools.node_by_manager(agentaddr)
|
||||
if node is None:
|
||||
raise exc.InvalidArgumentException(
|
||||
'Unable to find a node with specified manager')
|
||||
return confluent.core.handle_path(
|
||||
return await confluent.core.handle_path(
|
||||
'/nodes/{0}/events/hardware/decode'.format(node), 'update',
|
||||
configmanager, varbinds, autostrip=False)
|
||||
|
||||
|
||||
@@ -305,12 +305,6 @@ async def check_user_passphrase(name, passphrase, operation=None, element=None,
|
||||
# throw it at the worker pool when implemented
|
||||
# maybe a distinct worker pool, wondering about starving out non-auth stuff
|
||||
salt, crypt = ucfg['cryptpass']
|
||||
# execute inside tpool to get greenthreads to give it a special thread
|
||||
# world
|
||||
# TODO(jbjohnso): util function to generically offload a call
|
||||
# such a beast could be passed into pyghmi as a way for pyghmi to
|
||||
# magically get offload of the crypto functions without having
|
||||
# to explicitly get into the eventlet tpool game
|
||||
global authworkers
|
||||
global authcleaner
|
||||
if authworkers is None:
|
||||
|
||||
@@ -570,21 +570,23 @@ class ConsoleHandler(object):
|
||||
await self._got_disconnected()
|
||||
|
||||
|
||||
def disconnect_node(node, configmanager):
|
||||
async def disconnect_node(node, configmanager):
|
||||
consk = (node, configmanager.tenant)
|
||||
if consk in _handled_consoles:
|
||||
_handled_consoles[consk].close()
|
||||
await _handled_consoles[consk].close()
|
||||
del _handled_consoles[consk]
|
||||
|
||||
|
||||
def _nodechange(added, deleting, renamed, configmanager):
|
||||
async def _replace_node(old, new, cfm):
|
||||
await disconnect_node(old, cfm)
|
||||
await connect_node(new, cfm)
|
||||
for node in deleting:
|
||||
eventlet.spawn(disconnect_node, node, configmanager)
|
||||
tasks.spawn(disconnect_node(node, configmanager))
|
||||
for node in renamed:
|
||||
disconnect_node(node, configmanager)
|
||||
eventlet.spawn(connect_node, renamed[node], configmanager)
|
||||
tasks.spawn(_replace_node(node, renamed[node], configmanager))
|
||||
for node in added:
|
||||
eventlet.spawn(connect_node, node, configmanager)
|
||||
tasks.spawn(connect_node(node, configmanager))
|
||||
|
||||
|
||||
def _start_tenant_sessions(cfm):
|
||||
@@ -595,7 +597,7 @@ def _start_tenant_sessions(cfm):
|
||||
if manager and collective.get_myname() != manager:
|
||||
continue
|
||||
try:
|
||||
connect_node(node, cfm)
|
||||
await connect_node(node, cfm)
|
||||
except:
|
||||
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
||||
event=log.Events.stacktrace)
|
||||
@@ -617,7 +619,7 @@ async def start_console_sessions():
|
||||
configmodule.hook_new_configmanagers(_start_tenant_sessions)
|
||||
|
||||
|
||||
def connect_node(node, configmanager, username=None, direct=True, width=80,
|
||||
async def connect_node(node, configmanager, username=None, direct=True, width=80,
|
||||
height=24):
|
||||
attrval = configmanager.get_node_attributes(node, 'collective.manager')
|
||||
myc = attrval.get(node, {}).get('collective.manager', {}).get(
|
||||
|
||||
@@ -1404,7 +1404,7 @@ class Staging:
|
||||
raise FileNotFoundError
|
||||
return directory
|
||||
|
||||
def handle_staging(pathcomponents, operation, configmanager, inputdata):
|
||||
async def handle_staging(pathcomponents, operation, configmanager, inputdata):
|
||||
'''
|
||||
e.g push_url: /confluent-api/staging/user/<unique_id>
|
||||
'''
|
||||
@@ -1433,10 +1433,11 @@ def handle_staging(pathcomponents, operation, configmanager, inputdata):
|
||||
with open(file, 'wb') as f:
|
||||
while remaining_length > 0:
|
||||
progress = (1 - (remaining_length/content_length)) * 100
|
||||
#TODO: ASYNC Need to change to aiohttp approach
|
||||
datachunk = filedata['wsgi.input'].read(min(chunk_size, remaining_length))
|
||||
f.write(datachunk)
|
||||
remaining_length -= len(datachunk)
|
||||
eventlet.sleep(0)
|
||||
await asyncio.sleep(0)
|
||||
yield msg.FileUploadProgress(progress)
|
||||
yield msg.FileUploadProgress(100)
|
||||
|
||||
@@ -1552,11 +1553,11 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip=
|
||||
if element != 'decode':
|
||||
raise exc.NotFoundException()
|
||||
if operation == 'update':
|
||||
return alerts.decode_alert(inputdata, configmanager)
|
||||
return await alerts.decode_alert(inputdata, configmanager)
|
||||
elif pathcomponents[0] == 'discovery':
|
||||
return handle_discovery(pathcomponents[1:], operation, configmanager,
|
||||
inputdata)
|
||||
elif pathcomponents[0] == 'staging':
|
||||
return handle_staging(pathcomponents, operation, configmanager, inputdata)
|
||||
return await handle_staging(pathcomponents, operation, configmanager, inputdata)
|
||||
else:
|
||||
raise exc.NotFoundException()
|
||||
|
||||
@@ -549,11 +549,9 @@ async def register_remote_addrs(addresses, configmanager):
|
||||
except Exception:
|
||||
return addr, False
|
||||
return addr, True
|
||||
#rpool = eventlet.greenpool.GreenPool(512)
|
||||
for count in iterate_addrs(addresses, True):
|
||||
yield msg.ConfluentResourceCount(count)
|
||||
return # ASYNC
|
||||
for result in rpool.imap(register_remote_addr, iterate_addrs(addresses)):
|
||||
async for result in tasks.task_imap(register_remote_addr, iterate_addrs(addresses), max_concurrent=512):
|
||||
if result[1]:
|
||||
yield msg.CreatedResource(result[0])
|
||||
else:
|
||||
@@ -894,7 +892,7 @@ async def detected(info):
|
||||
rechecker = tasks.spawn_task_after(300, _periodic_recheck, cfg)
|
||||
unknown_info[info['hwaddr']] = info
|
||||
info['discostatus'] = 'unidentfied'
|
||||
#TODO, eventlet spawn after to recheck sooner, or somehow else
|
||||
#TODO, spawn after to recheck sooner, or somehow else
|
||||
# influence periodic recheck to shorten delay?
|
||||
return
|
||||
nodename, info['maccount'] = await get_nodename(cfg, handler, info)
|
||||
@@ -1715,9 +1713,7 @@ def stop_autosense():
|
||||
|
||||
def start_autosense():
|
||||
autosensors.add(tasks.spawn_task(slp.snoop(safe_detected, slp)))
|
||||
#autosensors.add(eventlet.spawn(mdns.snoop, safe_detected, mdns))
|
||||
tasks.spawn(pxe.snoop(safe_detected, pxe, get_node_guess_by_uuid))
|
||||
#autosensors.add(eventlet.spawn(pxe.snoop, safe_detected, pxe, get_node_guess_by_uuid))
|
||||
tasks.spawn(remotescan())
|
||||
|
||||
|
||||
|
||||
@@ -44,8 +44,8 @@ import os
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
import confluent.tasks as tasks
|
||||
|
||||
libc = ctypes.CDLL(ctypes.util.find_library('c'))
|
||||
|
||||
@@ -871,12 +871,12 @@ async def reply_dhcp4(node, info, packet, cfg, reqview, httpboot, cfd, profile,
|
||||
ipinfo = 'without address, served from {0}'.format(myip)
|
||||
if relayipa:
|
||||
ipinfo += ' (relayed to {} via {})'.format(relayipa, requestor[0])
|
||||
eventlet.spawn(send_rsp, repview, replen, requestor, relayip, reqview, info, deferanswer, isboot, node, boottype, ipinfo, sock)
|
||||
tasks.spawn(send_rsp(repview, replen, requestor, relayip, reqview, info, deferanswer, isboot, node, boottype, ipinfo, sock))
|
||||
|
||||
|
||||
def send_rsp(repview, replen, requestor, relayip, reqview, info, defertxid, isboot, node, boottype, ipinfo, sock):
|
||||
async def send_rsp(repview, replen, requestor, relayip, reqview, info, defertxid, isboot, node, boottype, ipinfo, sock):
|
||||
if defertxid:
|
||||
eventlet.sleep(0.5)
|
||||
await asyncio.sleep(0.5)
|
||||
if defertxid in _recent_txids:
|
||||
log.log({'info': 'Skipping reply for {} over interface {} due to better offer being made over other interface'.format(node, info['netinfo']['ifidx'])})
|
||||
return
|
||||
|
||||
@@ -17,9 +17,9 @@
|
||||
#This handles port forwarding for web interfaces on management devices
|
||||
#It will also hijack port 3900 and do best effort..
|
||||
|
||||
import eventlet
|
||||
import eventlet.green.select as select
|
||||
import eventlet.green.socket as socket
|
||||
import asyncio
|
||||
import socket
|
||||
import confluent.tasks as tasks
|
||||
forwardersbyclient = {}
|
||||
relaysbysession = {}
|
||||
sessionsbyip = {}
|
||||
@@ -28,24 +28,37 @@ sockhandler = {}
|
||||
vidtargetbypeer = {}
|
||||
vidforwarder = None
|
||||
|
||||
def handle_connection(incoming, outgoing):
|
||||
while True:
|
||||
r, _, _ = select.select((incoming, outgoing), (), (), 60)
|
||||
for mysock in r:
|
||||
data = mysock.recv(32768)
|
||||
if not data:
|
||||
incoming.close()
|
||||
outgoing.close()
|
||||
return
|
||||
if mysock == incoming:
|
||||
outgoing.sendall(data)
|
||||
elif mysock == outgoing:
|
||||
incoming.sendall(data)
|
||||
async def handle_connection(incoming, outgoing):
|
||||
async def _relay(reader, writer):
|
||||
try:
|
||||
while True:
|
||||
data = await reader.read(32768)
|
||||
if not data:
|
||||
return
|
||||
writer.write(data)
|
||||
await writer.drain()
|
||||
except (ConnectionError, OSError):
|
||||
return
|
||||
|
||||
inrdr, inwriter = await asyncio.open_connection(sock=incoming)
|
||||
outrdr, outwriter = await asyncio.open_connection(sock=outgoing)
|
||||
try:
|
||||
done, pending = await asyncio.wait(
|
||||
[asyncio.ensure_future(_relay(inrdr, outwriter)),
|
||||
asyncio.ensure_future(_relay(outrdr, inwriter))],
|
||||
return_when=asyncio.FIRST_COMPLETED)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
finally:
|
||||
inwriter.close()
|
||||
outwriter.close()
|
||||
|
||||
|
||||
def forward_port(sock, target, clientip, sessionid):
|
||||
async def forward_port(sock, target, clientip, sessionid):
|
||||
loop = asyncio.get_event_loop()
|
||||
sock.setblocking(False)
|
||||
while True:
|
||||
conn, cli = sock.accept()
|
||||
conn, cli = await loop.sock_accept(sock)
|
||||
if cli[0] != clientip:
|
||||
conn.close()
|
||||
continue
|
||||
@@ -57,14 +70,19 @@ def forward_port(sock, target, clientip, sessionid):
|
||||
continue
|
||||
if sessionid not in relaysbysession:
|
||||
relaysbysession[sessionid] = {}
|
||||
relaysbysession[sessionid][eventlet.spawn(
|
||||
handle_connection, conn, client)] = conn
|
||||
relaysbysession[sessionid][tasks.spawn(
|
||||
handle_connection(conn, client))] = conn
|
||||
|
||||
|
||||
def forward_video():
|
||||
sock = eventlet.listen(('::', 3900, 0, 0), family=socket.AF_INET6)
|
||||
async def forward_video():
|
||||
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.bind(('::', 3900, 0, 0))
|
||||
sock.listen(50)
|
||||
loop = asyncio.get_event_loop()
|
||||
sock.setblocking(False)
|
||||
while True:
|
||||
conn, cli = sock.accept()
|
||||
conn, cli = await loop.sock_accept(sock)
|
||||
if cli[0] not in vidtargetbypeer or not sessionsbyip.get(cli[0], None):
|
||||
conn.close()
|
||||
continue
|
||||
@@ -76,7 +94,7 @@ def forward_video():
|
||||
conn.close()
|
||||
vidclient.close()
|
||||
continue
|
||||
eventlet.spawn_n(handle_connection, conn, vidclient)
|
||||
tasks.spawn(handle_connection(conn, vidclient))
|
||||
|
||||
|
||||
def close_session(sessionid):
|
||||
@@ -124,10 +142,10 @@ def get_port(addr, clientip, sessionid):
|
||||
newport += 1
|
||||
continue
|
||||
forwardersbyclient[sessionid][addr] = newsock
|
||||
sockhandler[newsock] = eventlet.spawn(forward_port, newsock, addr,
|
||||
clientip, sessionid)
|
||||
sockhandler[newsock] = tasks.spawn(forward_port(newsock, addr,
|
||||
clientip, sessionid))
|
||||
if not vidforwarder:
|
||||
vidforwarder = eventlet.spawn(forward_video)
|
||||
vidforwarder = tasks.spawn(forward_video())
|
||||
vidtargetbypeer[clientip] = addr
|
||||
return forwardersbyclient[sessionid][addr].getsockname()[1]
|
||||
|
||||
|
||||
@@ -25,13 +25,13 @@
|
||||
# service should have a null tenant and a tenant entry that correlates)
|
||||
__author__ = 'jjohnson2'
|
||||
|
||||
import asyncio
|
||||
import confluent.config.configmanager as configmanager
|
||||
import itertools
|
||||
from eventlet.support import greendns
|
||||
|
||||
manager_to_nodemap = {}
|
||||
|
||||
def node_by_manager(manager):
|
||||
async def node_by_manager(manager):
|
||||
"""Lookup a node by manager
|
||||
|
||||
Search for a node according to a given network address.
|
||||
@@ -46,7 +46,7 @@ def node_by_manager(manager):
|
||||
"""
|
||||
|
||||
manageraddresses = []
|
||||
for tmpaddr in greendns.getaddrinfo(manager, None):
|
||||
for tmpaddr in await asyncio.get_event_loop().getaddrinfo(manager, None):
|
||||
manageraddresses.append(tmpaddr[4][0])
|
||||
cfm = configmanager.ConfigManager(None)
|
||||
if manager in manager_to_nodemap:
|
||||
@@ -68,7 +68,7 @@ def node_by_manager(manager):
|
||||
if currhm in manageraddresses:
|
||||
manager_to_nodemap[manager] = node
|
||||
return node
|
||||
for curraddr in greendns.getaddrinfo(currhm, None):
|
||||
for curraddr in await asyncio.get_event_loop().getaddrinfo(currhm, None):
|
||||
curraddr = curraddr[4][0]
|
||||
if curraddr in manageraddresses:
|
||||
manager_to_nodemap[manager] = node
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
|
||||
import asyncio
|
||||
import eventlet
|
||||
import confluent.messages as msg
|
||||
import confluent.exceptions as exc
|
||||
import struct
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
# This provides the implementation of locating MAC addresses on ethernet
|
||||
# switches. It is, essentially, a port of 'MacMap.pm' to confluent.
|
||||
# However, there are enhancements.
|
||||
# For one, each switch interrogation is handled in an eventlet 'thread'
|
||||
# For one, each switch interrogation is handled in an coroutine
|
||||
# For another, MAC addresses are checked in the dictionary on every
|
||||
# switch return, rather than waiting for all switches to check in
|
||||
# (which makes it more responsive when there is a missing or bad switch)
|
||||
|
||||
@@ -22,8 +22,8 @@
|
||||
# - One power supply is off.
|
||||
|
||||
import re
|
||||
import eventlet
|
||||
import eventlet.queue as queue
|
||||
import asyncio
|
||||
import confluent.tasks as tasks
|
||||
import confluent.exceptions as exc
|
||||
import confluent.messages as msg
|
||||
import confluent.util as util
|
||||
@@ -44,8 +44,8 @@ def _run_method(method, workers, results, configmanager, nodes, element):
|
||||
nodes, ["switchuser", "switchpass", "secret.hardwaremanagementpassword",
|
||||
"secret.hardwaremanagementuser"], decrypt=True)
|
||||
for node in nodes:
|
||||
workers.add(eventlet.spawn(method, configmanager, creds,
|
||||
node, results, element))
|
||||
workers.add(tasks.spawn(method(configmanager, creds,
|
||||
node, results, element)))
|
||||
|
||||
|
||||
def enos_login(node, configmanager, creds):
|
||||
@@ -91,7 +91,7 @@ def create(nodes, element, configmanager, inputdata):
|
||||
|
||||
|
||||
def retrieve(nodes, element, configmanager, inputdata):
|
||||
results = queue.LightQueue()
|
||||
results = asyncio.Queue()
|
||||
workers = set([])
|
||||
if element == ["power", "state"]:
|
||||
for node in nodes:
|
||||
@@ -112,14 +112,14 @@ def retrieve(nodes, element, configmanager, inputdata):
|
||||
currtimeout = 10
|
||||
while workers:
|
||||
try:
|
||||
datum = results.get(10)
|
||||
datum = await results.get()
|
||||
while datum:
|
||||
if datum:
|
||||
yield datum
|
||||
datum = results.get_nowait()
|
||||
except queue.Empty:
|
||||
except asyncio.QueueEmpty:
|
||||
pass
|
||||
eventlet.sleep(0.001)
|
||||
await asyncio.sleep(0.001)
|
||||
for t in list(workers):
|
||||
if t.dead:
|
||||
workers.discard(t)
|
||||
@@ -128,7 +128,7 @@ def retrieve(nodes, element, configmanager, inputdata):
|
||||
datum = results.get_nowait()
|
||||
if datum:
|
||||
yield datum
|
||||
except queue.Empty:
|
||||
except asyncio.QueueEmpty:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@@ -17,10 +17,10 @@
|
||||
|
||||
try:
|
||||
import confluent.sshutil as sshutil
|
||||
import eventlet
|
||||
import eventlet.green.subprocess as subprocess
|
||||
import confluent.tasks as tasks
|
||||
except ImportError:
|
||||
pass
|
||||
import asyncio
|
||||
import shutil
|
||||
import json
|
||||
import msgpack
|
||||
@@ -40,7 +40,7 @@ class PlayRunner(object):
|
||||
self.complete = False
|
||||
|
||||
def _start_playbooks(self):
|
||||
self.worker = eventlet.spawn(self._really_run_playbooks)
|
||||
self.worker = tasks.spawn(self._really_run_playbooks())
|
||||
|
||||
def get_available_results(self):
|
||||
avail = self.results
|
||||
@@ -79,7 +79,7 @@ class PlayRunner(object):
|
||||
'results': self.get_available_results()
|
||||
}
|
||||
|
||||
def _really_run_playbooks(self):
|
||||
async def _really_run_playbooks(self):
|
||||
global anspypath
|
||||
try:
|
||||
mypath = anspypath
|
||||
@@ -92,21 +92,21 @@ class PlayRunner(object):
|
||||
mypath = anspypath
|
||||
if not mypath:
|
||||
mypath = sys.executable
|
||||
with open(os.devnull, 'w+') as devnull:
|
||||
targnodes = ','.join(self.nodes)
|
||||
for playfilename in self.playfiles:
|
||||
worker = subprocess.Popen(
|
||||
[mypath, __file__, targnodes, playfilename],
|
||||
stdin=devnull, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
stdout, stder = worker.communicate()
|
||||
self.stderr += stder.decode('utf8')
|
||||
current = memoryview(stdout)
|
||||
while len(current):
|
||||
sz = struct.unpack('=q', current[:8])[0]
|
||||
result = msgpack.unpackb(current[8:8+sz], raw=False)
|
||||
self.results.append(result)
|
||||
current = current[8+sz:]
|
||||
targnodes = ','.join(self.nodes)
|
||||
for playfilename in self.playfiles:
|
||||
worker = await asyncio.create_subprocess_exec(
|
||||
mypath, __file__, targnodes, playfilename,
|
||||
stdin=asyncio.subprocess.DEVNULL,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE)
|
||||
stdout, stder = await worker.communicate()
|
||||
self.stderr += stder.decode('utf8')
|
||||
current = memoryview(stdout)
|
||||
while len(current):
|
||||
sz = struct.unpack('=q', current[:8])[0]
|
||||
result = msgpack.unpackb(current[8:8+sz], raw=False)
|
||||
self.results.append(result)
|
||||
current = current[8+sz:]
|
||||
finally:
|
||||
self.complete = True
|
||||
|
||||
|
||||
@@ -366,7 +366,7 @@ async def handle_request(req, make_response, mimetype):
|
||||
pass
|
||||
if needlocalectl:
|
||||
try:
|
||||
langinfo = util.run(['localectl', 'status'])[0].split(b'\n')
|
||||
langinfo = (await util.check_output('localectl', 'status'))[0].split(b'\n')
|
||||
except Exception:
|
||||
langinfo = []
|
||||
for line in langinfo:
|
||||
|
||||
@@ -22,10 +22,10 @@
|
||||
# only by the process owner and such an owner would be able to read a file
|
||||
# anyway. Regardless, it is advisable to 'unset'
|
||||
|
||||
import asyncio
|
||||
import confluent.interface.console as conapi
|
||||
import eventlet
|
||||
import eventlet.green.select as select
|
||||
import eventlet.green.subprocess as subprocess
|
||||
import confluent.tasks as tasks
|
||||
#TODO: ASYNC: port shellmodule over if it matters
|
||||
import fcntl
|
||||
import os
|
||||
import pty
|
||||
@@ -45,7 +45,7 @@ class ExecConsole(conapi.Console):
|
||||
'CONFLUENT_NODE': node,
|
||||
}
|
||||
|
||||
def relaydata(self):
|
||||
async def relaydata(self):
|
||||
while self.subproc is not None:
|
||||
rdylist, _, _ = select.select(
|
||||
(self._master, self.subproc.stderr), (), (),
|
||||
@@ -55,7 +55,7 @@ class ExecConsole(conapi.Console):
|
||||
somedata = os.read(self._master, 128)
|
||||
while somedata:
|
||||
self._datacallback(somedata)
|
||||
eventlet.sleep(0)
|
||||
await asyncio.sleep(0)
|
||||
somedata = os.read(self._master, 128)
|
||||
except OSError as e:
|
||||
if e.errno == 5:
|
||||
@@ -69,7 +69,7 @@ class ExecConsole(conapi.Console):
|
||||
somedata = self.subproc.stderr.read()
|
||||
while somedata:
|
||||
self._datacallback(somedata)
|
||||
eventlet.sleep(0)
|
||||
await asyncio.sleep(0)
|
||||
somedata = self.subproc.stderr.read()
|
||||
except IOError as e:
|
||||
if e.errno != 11:
|
||||
@@ -95,12 +95,12 @@ class ExecConsole(conapi.Console):
|
||||
os.close(slave)
|
||||
fcntl.fcntl(master, fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
fcntl.fcntl(self.subproc.stderr.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
self.readerthread = eventlet.spawn(self.relaydata)
|
||||
self.readerthread = tasks.spawn(self.relaydata())
|
||||
|
||||
def write(self, data):
|
||||
os.write(self._master, data)
|
||||
|
||||
def close(self):
|
||||
async def close(self):
|
||||
try:
|
||||
os.close(self._master)
|
||||
except OSError:
|
||||
@@ -110,7 +110,7 @@ class ExecConsole(conapi.Console):
|
||||
self.subproc.terminate()
|
||||
waittime = 10
|
||||
while self.subproc is not None and self.subproc.poll() is None:
|
||||
eventlet.sleep(1)
|
||||
await asyncio.sleep(1)
|
||||
waittime -= 1
|
||||
if waittime == 0:
|
||||
break
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
# capacity for having a multiple of sessions per node active at a given time
|
||||
|
||||
|
||||
import asyncio
|
||||
import confluent.consoleserver as consoleserver
|
||||
import confluent.exceptions as exc
|
||||
import confluent.messages as msg
|
||||
@@ -28,9 +29,9 @@ activesessions = {}
|
||||
|
||||
_reaper = None
|
||||
|
||||
def reapsessions():
|
||||
async def reapsessions():
|
||||
while True:
|
||||
eventlet.sleep(30)
|
||||
await asyncio.sleep(30)
|
||||
for clientid in activesessions:
|
||||
currcli = activesessions[clientid]
|
||||
for sesshdl in list(currcli):
|
||||
@@ -50,7 +51,7 @@ class _ShellHandler(consoleserver.ConsoleHandler):
|
||||
self.numusers = 0
|
||||
global _reaper
|
||||
if _reaper is None:
|
||||
_reaper = eventlet.spawn(reapsessions)
|
||||
_reaper = tasks.spawn(reapsessions())
|
||||
|
||||
|
||||
def check_collective(self, attrvalue):
|
||||
|
||||
@@ -17,8 +17,8 @@
|
||||
# This provides a simplified wrapper around snmp implementation roughly
|
||||
# mapping to the net-snmp commands
|
||||
|
||||
# net-snmp-python was considered as the API is cleaner, but the ability to
|
||||
# patch pysnmp to have it be eventlet friendly has caused it's selection
|
||||
# net-snmp-python was considered as the API is cleaner, but
|
||||
# want clean asyncio support
|
||||
# This module simplifies the complex hlapi pysnmp interface
|
||||
|
||||
import asyncio
|
||||
|
||||
@@ -6,7 +6,6 @@ import confluent.config.configmanager as cfm
|
||||
import confluent.collective.manager as collective
|
||||
import confluent.util as util
|
||||
import glob
|
||||
import eventlet.green.os as os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
@@ -39,7 +38,7 @@ async def assure_agent():
|
||||
if agent_pid is None:
|
||||
try:
|
||||
agent_starting = True
|
||||
sai = util.run(['ssh-agent'])[0]
|
||||
sai = await util.check_output(['ssh-agent'])[0]
|
||||
for line in sai.split(b'\n'):
|
||||
if b';' not in line:
|
||||
continue
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import asyncio
|
||||
import glob
|
||||
import os
|
||||
import shutil
|
||||
@@ -21,7 +22,7 @@ import tempfile
|
||||
import confluent.sshutil as sshutil
|
||||
import confluent.util as util
|
||||
import confluent.noderange as noderange
|
||||
import eventlet
|
||||
import confluent.tasks as tasks
|
||||
import pwd
|
||||
import grp
|
||||
import sys
|
||||
@@ -194,8 +195,8 @@ def sync_list_to_node(sl, node, suffixes, peerip=None):
|
||||
targip = node
|
||||
if peerip:
|
||||
targip = peerip
|
||||
output, stderr = util.run(
|
||||
['rsync', '-rvLD', targdir + '/', 'root@[{}]:/'.format(targip)])
|
||||
output, stderr = util.check_output(
|
||||
'rsync', '-rvLD', targdir + '/', 'root@[{}]:/'.format(targip))
|
||||
except Exception as e:
|
||||
if 'CalledProcessError' not in repr(e):
|
||||
# https://github.com/eventlet/eventlet/issues/413
|
||||
@@ -321,14 +322,14 @@ def start_syncfiles(nodename, cfg, suffixes, principals=[]):
|
||||
syncrunners[nodename].wait()
|
||||
else:
|
||||
return '503 Synchronization already in progress', 'Synchronization already in progress for {}'.format(nodename)
|
||||
syncrunners[nodename] = eventlet.spawn(
|
||||
sync_list_to_node, sl, nodename, suffixes, peerip)
|
||||
syncrunners[nodename] = tasks.spawn(
|
||||
sync_list_to_node(sl, nodename, suffixes, peerip))
|
||||
if not cleaner:
|
||||
cleaner = eventlet.spawn(cleanit)
|
||||
cleaner = tasks.spawn(cleanit())
|
||||
return '202 Queued', 'Background synchronization initiated' # backgrounded
|
||||
|
||||
|
||||
def cleanit():
|
||||
async def cleanit():
|
||||
toreap = {}
|
||||
while True:
|
||||
for nn in list(syncrunners):
|
||||
@@ -345,7 +346,7 @@ def cleanit():
|
||||
toreap[nn] = 1
|
||||
elif nn in toreap:
|
||||
del toreap[nn]
|
||||
eventlet.sleep(30)
|
||||
await asyncio.sleep(30)
|
||||
|
||||
|
||||
def get_syncresult(nodename):
|
||||
|
||||
Reference in New Issue
Block a user