diff --git a/pyghmi/ipmi/private/session.py b/pyghmi/ipmi/private/session.py index d5256465..7bf86ffe 100644 --- a/pyghmi/ipmi/private/session.py +++ b/pyghmi/ipmi/private/session.py @@ -17,7 +17,7 @@ # This represents the low layer message framing portion of IPMI import collections -import fcntl +import ctypes import hashlib import hmac import operator @@ -48,7 +48,6 @@ iothread = None # the thread in which all IO will be performed iothreadready = False # whether io thread is yet ready to work iothreadwaiters = [] # threads waiting for iothreadready ioqueue = collections.deque([]) -selectbreak = None selectdeadline = 0 running = True iosockets = [] # set of iosockets that will be shared amongst Session objects @@ -60,19 +59,15 @@ MAX_BMCS_PER_SOCKET = 64 # no more than this many BMCs will share a socket def define_worker(): class _IOWorker(threading.Thread): def join(self): - global selectbreak Session._cleanup() self.running = False - os.write(selectbreak[1], '1') + iosockets[0].sendto('\x01', ('::1', iosockets[0].getsockname()[1])) super(_IOWorker, self).join() def run(self): self.running = True global iothreadready - global selectbreak global selectdeadline - selectbreak = os.pipe() - fcntl.fcntl(selectbreak[0], fcntl.F_SETFL, os.O_NONBLOCK) iowaiters = [] timeout = 300 iothreadready = True @@ -83,21 +78,13 @@ def define_worker(): if timeout < 0: timeout = 0 selectdeadline = _monotonic_time() + timeout - mysockets = iosockets + [selectbreak[0]] - tmplist, _, _ = select.select(mysockets, (), (), timeout) + tmplist, _, _ = select.select(iosockets, (), (), timeout) # pessimistically move out the deadline # doing it this early (before ioqueue is evaluated) # this avoids other threads making a bad assumption # about not having to break into the select selectdeadline = _monotonic_time() + 300 _io_graball(iosockets) - try: # flush all pending requests - while True: - os.read(selectbreak[0], 1) - except OSError: - # this means an EWOULDBLOCK, ignore that as that - # was the endgame - pass for w in iowaiters: w[3].set() iowaiters = [] @@ -131,16 +118,18 @@ def define_worker(): pktqueue = collections.deque([]) -def _io_apply(function, args): - global selectbreak +def _io_wait(timeout): evt = threading.Event() result = [] - ioqueue.append((function, args, result, evt)) - if not (function == 'wait' and selectdeadline < args): - os.write(selectbreak[1], '1') + deadline = timeout + _monotonic_time() + ioqueue.append(('wait', deadline, result, evt)) + # Unfortunately, at least with eventlet patched threading, the wait() + # is a somewhat busy wait if given a deadline. Workaround by having + # it piggy back on the select() in the io thread, which is a truly + # lazy wait even with eventlet involvement + if deadline < selectdeadline: + iosockets[0].sendto('\x01', ('::1', iosockets[0].getsockname()[1])) evt.wait() - if result: - return result[0] def _io_sendto(mysocket, packet, sockaddr): @@ -158,6 +147,12 @@ def _io_graball(mysockets): rdata = _io_recvfrom(mysocket, 3000) if rdata is None: break + # If the payload is shorter than 4 bytes, it cannot + # be a useful packet. Skip it entirely. + # This applies to the packet sent to self to break + # into the select + if len(rdata[0]) < 4: + continue rdata = rdata + (mysocket,) pktqueue.append(rdata) @@ -169,6 +164,19 @@ def _io_recvfrom(mysocket, size): except socket.error: return None +wintime = None +try: + wintime = ctypes.windll.kernel32.GetTickCount64 +except AttributeError: + pass + +try: + IPPROTO_IPV6 = socket.IPPROTO_IPV6 +except AttributeError: + IPPROTO_IPV6 = 41 # This is the Win32 version of IPPROTO_IPV6, the only + # platform where python *doesn't* have this in socket that pyghmi is + # targetting. + def _monotonic_time(): """Provides a monotonic timer @@ -179,13 +187,15 @@ def _monotonic_time(): # Python does not provide one until 3.3, so we make do # for most OSes, os.times()[4] works well. # for microsoft, GetTickCount64 + if wintime: + return wintime() / 1000.0 return os.times()[4] def _poller(timeout=0): if pktqueue: return True - _io_apply('wait', timeout + _monotonic_time()) + _io_wait(timeout) return pktqueue @@ -278,17 +288,6 @@ class Session(object): global iothread global iothreadready global iosockets - if iothread is None: - initevt = threading.Event() - iothreadwaiters.append(initevt) - _IOWorker = define_worker() - iothread = _IOWorker() - iothread.start() - initevt.wait() - elif not iothreadready: - initevt = threading.Event() - iothreadwaiters.append(initevt) - initevt.wait() # seek for the least used socket. As sessions close, they may free # up slots in seemingly 'full' sockets. This scheme allows those # slots to be recycled @@ -300,15 +299,28 @@ class Session(object): cls.socketpool[sorted_candidates[0][0]] += 1 return sorted_candidates[0][0] # we need a new socket - tmpsocket = _io_apply(socket.socket, - (socket.AF_INET6, socket.SOCK_DGRAM)) # INET6 + tmpsocket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) # INET6 # can do IPv4 if you are nice to it - tmpsocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + tmpsocket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + # Rather than wait until send() to bind, bind now so that we have + # a port number allocated no matter what + tmpsocket.bind(('', 0)) if server is None: cls.socketpool[tmpsocket] = 1 else: tmpsocket.bind(server) iosockets.append(tmpsocket) + if iothread is None: + initevt = threading.Event() + iothreadwaiters.append(initevt) + _IOWorker = define_worker() + iothread = _IOWorker() + iothread.start() + initevt.wait() + elif not iothreadready: + initevt = threading.Event() + iothreadwaiters.append(initevt) + initevt.wait() return tmpsocket def _sync_login(self, response):