mirror of
https://opendev.org/x/pyghmi
synced 2026-05-13 18:04:19 +00:00
Have session.py work better in Windows
Have monotonic_time reworked to use Windows function when available. Eliminate use of an os.pipe to break into the select. When we want to interrupt the select, use the first udp socket as the angle to break in. Additionally, provide a Windows friendly value for IPPROTO_IPV6 should the socket module fail us. Change-Id: I8f566543c8da28eb7ed76f1cb80ff4cb2dcbba96
This commit is contained in:
@@ -17,7 +17,7 @@
|
||||
# This represents the low layer message framing portion of IPMI
|
||||
|
||||
import collections
|
||||
import fcntl
|
||||
import ctypes
|
||||
import hashlib
|
||||
import hmac
|
||||
import operator
|
||||
@@ -48,7 +48,6 @@ iothread = None # the thread in which all IO will be performed
|
||||
iothreadready = False # whether io thread is yet ready to work
|
||||
iothreadwaiters = [] # threads waiting for iothreadready
|
||||
ioqueue = collections.deque([])
|
||||
selectbreak = None
|
||||
selectdeadline = 0
|
||||
running = True
|
||||
iosockets = [] # set of iosockets that will be shared amongst Session objects
|
||||
@@ -60,19 +59,15 @@ MAX_BMCS_PER_SOCKET = 64 # no more than this many BMCs will share a socket
|
||||
def define_worker():
|
||||
class _IOWorker(threading.Thread):
|
||||
def join(self):
|
||||
global selectbreak
|
||||
Session._cleanup()
|
||||
self.running = False
|
||||
os.write(selectbreak[1], '1')
|
||||
iosockets[0].sendto('\x01', ('::1', iosockets[0].getsockname()[1]))
|
||||
super(_IOWorker, self).join()
|
||||
|
||||
def run(self):
|
||||
self.running = True
|
||||
global iothreadready
|
||||
global selectbreak
|
||||
global selectdeadline
|
||||
selectbreak = os.pipe()
|
||||
fcntl.fcntl(selectbreak[0], fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
iowaiters = []
|
||||
timeout = 300
|
||||
iothreadready = True
|
||||
@@ -83,21 +78,13 @@ def define_worker():
|
||||
if timeout < 0:
|
||||
timeout = 0
|
||||
selectdeadline = _monotonic_time() + timeout
|
||||
mysockets = iosockets + [selectbreak[0]]
|
||||
tmplist, _, _ = select.select(mysockets, (), (), timeout)
|
||||
tmplist, _, _ = select.select(iosockets, (), (), timeout)
|
||||
# pessimistically move out the deadline
|
||||
# doing it this early (before ioqueue is evaluated)
|
||||
# this avoids other threads making a bad assumption
|
||||
# about not having to break into the select
|
||||
selectdeadline = _monotonic_time() + 300
|
||||
_io_graball(iosockets)
|
||||
try: # flush all pending requests
|
||||
while True:
|
||||
os.read(selectbreak[0], 1)
|
||||
except OSError:
|
||||
# this means an EWOULDBLOCK, ignore that as that
|
||||
# was the endgame
|
||||
pass
|
||||
for w in iowaiters:
|
||||
w[3].set()
|
||||
iowaiters = []
|
||||
@@ -131,16 +118,18 @@ def define_worker():
|
||||
pktqueue = collections.deque([])
|
||||
|
||||
|
||||
def _io_apply(function, args):
|
||||
global selectbreak
|
||||
def _io_wait(timeout):
|
||||
evt = threading.Event()
|
||||
result = []
|
||||
ioqueue.append((function, args, result, evt))
|
||||
if not (function == 'wait' and selectdeadline < args):
|
||||
os.write(selectbreak[1], '1')
|
||||
deadline = timeout + _monotonic_time()
|
||||
ioqueue.append(('wait', deadline, result, evt))
|
||||
# Unfortunately, at least with eventlet patched threading, the wait()
|
||||
# is a somewhat busy wait if given a deadline. Workaround by having
|
||||
# it piggy back on the select() in the io thread, which is a truly
|
||||
# lazy wait even with eventlet involvement
|
||||
if deadline < selectdeadline:
|
||||
iosockets[0].sendto('\x01', ('::1', iosockets[0].getsockname()[1]))
|
||||
evt.wait()
|
||||
if result:
|
||||
return result[0]
|
||||
|
||||
|
||||
def _io_sendto(mysocket, packet, sockaddr):
|
||||
@@ -158,6 +147,12 @@ def _io_graball(mysockets):
|
||||
rdata = _io_recvfrom(mysocket, 3000)
|
||||
if rdata is None:
|
||||
break
|
||||
# If the payload is shorter than 4 bytes, it cannot
|
||||
# be a useful packet. Skip it entirely.
|
||||
# This applies to the packet sent to self to break
|
||||
# into the select
|
||||
if len(rdata[0]) < 4:
|
||||
continue
|
||||
rdata = rdata + (mysocket,)
|
||||
pktqueue.append(rdata)
|
||||
|
||||
@@ -169,6 +164,19 @@ def _io_recvfrom(mysocket, size):
|
||||
except socket.error:
|
||||
return None
|
||||
|
||||
wintime = None
|
||||
try:
|
||||
wintime = ctypes.windll.kernel32.GetTickCount64
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
try:
|
||||
IPPROTO_IPV6 = socket.IPPROTO_IPV6
|
||||
except AttributeError:
|
||||
IPPROTO_IPV6 = 41 # This is the Win32 version of IPPROTO_IPV6, the only
|
||||
# platform where python *doesn't* have this in socket that pyghmi is
|
||||
# targetting.
|
||||
|
||||
|
||||
def _monotonic_time():
|
||||
"""Provides a monotonic timer
|
||||
@@ -179,13 +187,15 @@ def _monotonic_time():
|
||||
# Python does not provide one until 3.3, so we make do
|
||||
# for most OSes, os.times()[4] works well.
|
||||
# for microsoft, GetTickCount64
|
||||
if wintime:
|
||||
return wintime() / 1000.0
|
||||
return os.times()[4]
|
||||
|
||||
|
||||
def _poller(timeout=0):
|
||||
if pktqueue:
|
||||
return True
|
||||
_io_apply('wait', timeout + _monotonic_time())
|
||||
_io_wait(timeout)
|
||||
return pktqueue
|
||||
|
||||
|
||||
@@ -278,17 +288,6 @@ class Session(object):
|
||||
global iothread
|
||||
global iothreadready
|
||||
global iosockets
|
||||
if iothread is None:
|
||||
initevt = threading.Event()
|
||||
iothreadwaiters.append(initevt)
|
||||
_IOWorker = define_worker()
|
||||
iothread = _IOWorker()
|
||||
iothread.start()
|
||||
initevt.wait()
|
||||
elif not iothreadready:
|
||||
initevt = threading.Event()
|
||||
iothreadwaiters.append(initevt)
|
||||
initevt.wait()
|
||||
# seek for the least used socket. As sessions close, they may free
|
||||
# up slots in seemingly 'full' sockets. This scheme allows those
|
||||
# slots to be recycled
|
||||
@@ -300,15 +299,28 @@ class Session(object):
|
||||
cls.socketpool[sorted_candidates[0][0]] += 1
|
||||
return sorted_candidates[0][0]
|
||||
# we need a new socket
|
||||
tmpsocket = _io_apply(socket.socket,
|
||||
(socket.AF_INET6, socket.SOCK_DGRAM)) # INET6
|
||||
tmpsocket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) # INET6
|
||||
# can do IPv4 if you are nice to it
|
||||
tmpsocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
|
||||
tmpsocket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
|
||||
# Rather than wait until send() to bind, bind now so that we have
|
||||
# a port number allocated no matter what
|
||||
tmpsocket.bind(('', 0))
|
||||
if server is None:
|
||||
cls.socketpool[tmpsocket] = 1
|
||||
else:
|
||||
tmpsocket.bind(server)
|
||||
iosockets.append(tmpsocket)
|
||||
if iothread is None:
|
||||
initevt = threading.Event()
|
||||
iothreadwaiters.append(initevt)
|
||||
_IOWorker = define_worker()
|
||||
iothread = _IOWorker()
|
||||
iothread.start()
|
||||
initevt.wait()
|
||||
elif not iothreadready:
|
||||
initevt = threading.Event()
|
||||
iothreadwaiters.append(initevt)
|
||||
initevt.wait()
|
||||
return tmpsocket
|
||||
|
||||
def _sync_login(self, response):
|
||||
|
||||
Reference in New Issue
Block a user