diff --git a/pyghmi/ipmi/private/session.py b/pyghmi/ipmi/private/session.py index 0c57b5be..50af7671 100644 --- a/pyghmi/ipmi/private/session.py +++ b/pyghmi/ipmi/private/session.py @@ -47,8 +47,6 @@ iothread = None # the thread in which all IO will be performed # the nature of things. iothreadready = False # whether io thread is yet ready to work iothreadwaiters = [] # threads waiting for iothreadready -ignoresockets = set() # between 'select' firing and 'recvfrom', a socket - # should be ignored ioqueue = collections.deque([]) selectbreak = None selectdeadline = 0 @@ -78,41 +76,29 @@ def define_worker(): iowaiters = [] timeout = 300 iothreadready = True + while iothreadwaiters: + waiter = iothreadwaiters.pop() + waiter.set() while self.running: - while iothreadwaiters: - waiter = iothreadwaiters.pop() - waiter.set() if timeout < 0: timeout = 0 selectdeadline = _monotonic_time() + timeout - if ignoresockets: - mysockets = [selectbreak[0]] - for pendingsocket in iosockets: - if pendingsocket not in ignoresockets: - mysockets.append(pendingsocket) - else: - mysockets = iosockets + [selectbreak[0]] + mysockets = iosockets + [selectbreak[0]] tmplist, _, _ = select.select(mysockets, (), (), timeout) # pessimistically move out the deadline # doing it this early (before ioqueue is evaluated) # this avoids other threads making a bad assumption # about not having to break into the select selectdeadline = _monotonic_time() + 300 - rdylist = [] - for handle in tmplist: - if handle is selectbreak[0]: - try: # flush all pending requests - while True: - os.read(handle, 1) - except OSError: - # this means an EWOULDBLOCK, ignore that as that - # was the endgame - pass - else: - ignoresockets.add(handle) - rdylist.append(handle) + _io_graball(iosockets) + try: # flush all pending requests + while True: + os.read(selectbreak[0], 1) + except OSError: + # this means an EWOULDBLOCK, ignore that as that + # was the endgame + pass for w in iowaiters: - w[2].append(tuple(rdylist)) w[3].set() iowaiters = [] timeout = 300 @@ -132,8 +118,7 @@ def define_worker(): traceback.print_exc() workitem[3].set() elif workitem[0] == 'wait': - if len(rdylist) > 0: - workitem[2].append(tuple(rdylist)) + if pktqueue: workitem[3].set() else: ltimeout = workitem[1] - _monotonic_time() @@ -154,7 +139,8 @@ def _io_apply(function, args): if not (function == 'wait' and selectdeadline < args): os.write(selectbreak[1], '1') evt.wait() - return result[0] + if result: + return result[0] def _io_sendto(mysocket, packet, sockaddr): @@ -178,7 +164,6 @@ def _io_graball(mysockets): def _io_recvfrom(mysocket, size): mysocket.setblocking(0) - ignoresockets.discard(mysocket) try: return mysocket.recvfrom(size) except socket.error: @@ -198,9 +183,10 @@ def _monotonic_time(): def _poller(timeout=0): - if ignoresockets: + if pktqueue: return True - return _io_apply('wait', timeout + _monotonic_time()) + _io_apply('wait', timeout + _monotonic_time()) + return pktqueue def _aespad(data): @@ -980,11 +966,9 @@ class Session(object): if timeout is None: return 0 if _poller(timeout=timeout): - _io_apply(_io_graball, (iosockets, )) - while len(pktqueue): + while pktqueue: (data, sockaddr, mysocket) = pktqueue.popleft() cls._route_ipmiresponse(sockaddr, data, mysocket) - _io_apply(_io_graball, (iosockets, )) sessionstodel = [] sessionstokeepalive = [] for session, parms in cls.keepalive_sessions.iteritems():