From 02c058ee065c5c21e5af11ad2eec461533e4a50a Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 18 Jun 2015 08:48:48 -0400 Subject: [PATCH] Have session.py work better in Windows Have monotonic_time reworked to use Windows function when available. Eliminate use of an os.pipe to break into the select. When we want to interrupt the select, use the first udp socket as the angle to break in. Additionally, provide a Windows friendly value for IPPROTO_IPV6 should the socket module fail us. Change-Id: I8f566543c8da28eb7ed76f1cb80ff4cb2dcbba96 --- pyghmi/ipmi/private/session.py | 88 +++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 38 deletions(-) diff --git a/pyghmi/ipmi/private/session.py b/pyghmi/ipmi/private/session.py index d5256465..7bf86ffe 100644 --- a/pyghmi/ipmi/private/session.py +++ b/pyghmi/ipmi/private/session.py @@ -17,7 +17,7 @@ # This represents the low layer message framing portion of IPMI import collections -import fcntl +import ctypes import hashlib import hmac import operator @@ -48,7 +48,6 @@ iothread = None # the thread in which all IO will be performed iothreadready = False # whether io thread is yet ready to work iothreadwaiters = [] # threads waiting for iothreadready ioqueue = collections.deque([]) -selectbreak = None selectdeadline = 0 running = True iosockets = [] # set of iosockets that will be shared amongst Session objects @@ -60,19 +59,15 @@ MAX_BMCS_PER_SOCKET = 64 # no more than this many BMCs will share a socket def define_worker(): class _IOWorker(threading.Thread): def join(self): - global selectbreak Session._cleanup() self.running = False - os.write(selectbreak[1], '1') + iosockets[0].sendto('\x01', ('::1', iosockets[0].getsockname()[1])) super(_IOWorker, self).join() def run(self): self.running = True global iothreadready - global selectbreak global selectdeadline - selectbreak = os.pipe() - fcntl.fcntl(selectbreak[0], fcntl.F_SETFL, os.O_NONBLOCK) iowaiters = [] timeout = 300 iothreadready = True @@ -83,21 +78,13 @@ def define_worker(): if timeout < 0: timeout = 0 selectdeadline = _monotonic_time() + timeout - mysockets = iosockets + [selectbreak[0]] - tmplist, _, _ = select.select(mysockets, (), (), timeout) + tmplist, _, _ = select.select(iosockets, (), (), timeout) # pessimistically move out the deadline # doing it this early (before ioqueue is evaluated) # this avoids other threads making a bad assumption # about not having to break into the select selectdeadline = _monotonic_time() + 300 _io_graball(iosockets) - try: # flush all pending requests - while True: - os.read(selectbreak[0], 1) - except OSError: - # this means an EWOULDBLOCK, ignore that as that - # was the endgame - pass for w in iowaiters: w[3].set() iowaiters = [] @@ -131,16 +118,18 @@ def define_worker(): pktqueue = collections.deque([]) -def _io_apply(function, args): - global selectbreak +def _io_wait(timeout): evt = threading.Event() result = [] - ioqueue.append((function, args, result, evt)) - if not (function == 'wait' and selectdeadline < args): - os.write(selectbreak[1], '1') + deadline = timeout + _monotonic_time() + ioqueue.append(('wait', deadline, result, evt)) + # Unfortunately, at least with eventlet patched threading, the wait() + # is a somewhat busy wait if given a deadline. Workaround by having + # it piggy back on the select() in the io thread, which is a truly + # lazy wait even with eventlet involvement + if deadline < selectdeadline: + iosockets[0].sendto('\x01', ('::1', iosockets[0].getsockname()[1])) evt.wait() - if result: - return result[0] def _io_sendto(mysocket, packet, sockaddr): @@ -158,6 +147,12 @@ def _io_graball(mysockets): rdata = _io_recvfrom(mysocket, 3000) if rdata is None: break + # If the payload is shorter than 4 bytes, it cannot + # be a useful packet. Skip it entirely. + # This applies to the packet sent to self to break + # into the select + if len(rdata[0]) < 4: + continue rdata = rdata + (mysocket,) pktqueue.append(rdata) @@ -169,6 +164,19 @@ def _io_recvfrom(mysocket, size): except socket.error: return None +wintime = None +try: + wintime = ctypes.windll.kernel32.GetTickCount64 +except AttributeError: + pass + +try: + IPPROTO_IPV6 = socket.IPPROTO_IPV6 +except AttributeError: + IPPROTO_IPV6 = 41 # This is the Win32 version of IPPROTO_IPV6, the only + # platform where python *doesn't* have this in socket that pyghmi is + # targetting. + def _monotonic_time(): """Provides a monotonic timer @@ -179,13 +187,15 @@ def _monotonic_time(): # Python does not provide one until 3.3, so we make do # for most OSes, os.times()[4] works well. # for microsoft, GetTickCount64 + if wintime: + return wintime() / 1000.0 return os.times()[4] def _poller(timeout=0): if pktqueue: return True - _io_apply('wait', timeout + _monotonic_time()) + _io_wait(timeout) return pktqueue @@ -278,17 +288,6 @@ class Session(object): global iothread global iothreadready global iosockets - if iothread is None: - initevt = threading.Event() - iothreadwaiters.append(initevt) - _IOWorker = define_worker() - iothread = _IOWorker() - iothread.start() - initevt.wait() - elif not iothreadready: - initevt = threading.Event() - iothreadwaiters.append(initevt) - initevt.wait() # seek for the least used socket. As sessions close, they may free # up slots in seemingly 'full' sockets. This scheme allows those # slots to be recycled @@ -300,15 +299,28 @@ class Session(object): cls.socketpool[sorted_candidates[0][0]] += 1 return sorted_candidates[0][0] # we need a new socket - tmpsocket = _io_apply(socket.socket, - (socket.AF_INET6, socket.SOCK_DGRAM)) # INET6 + tmpsocket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) # INET6 # can do IPv4 if you are nice to it - tmpsocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + tmpsocket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + # Rather than wait until send() to bind, bind now so that we have + # a port number allocated no matter what + tmpsocket.bind(('', 0)) if server is None: cls.socketpool[tmpsocket] = 1 else: tmpsocket.bind(server) iosockets.append(tmpsocket) + if iothread is None: + initevt = threading.Event() + iothreadwaiters.append(initevt) + _IOWorker = define_worker() + iothread = _IOWorker() + iothread.start() + initevt.wait() + elif not iothreadready: + initevt = threading.Event() + iothreadwaiters.append(initevt) + initevt.wait() return tmpsocket def _sync_login(self, response):