From f0d3050a79f6cff5464e33d09089b7a47d4dc893 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 18 Feb 2015 10:15:09 -0500 Subject: [PATCH] Streamline and simplify IO Polling Get more performance improvements by moving more of the serialized effort into the IO thread to avoid churn. This also simplifies the issue with select being called without recvfrom, allowing removal of the ignoresockets mechanism. Also rework wait to avoid having to build lists that no one ever consumes and move work out of the eternal loop that only should happen at startup. This has shaved an additional 25% off of wallclock time in a single-processor context for a given workload. Change-Id: If321a69fabfb3ee55599ecfe3d24fbacd33388b5 --- pyghmi/ipmi/private/session.py | 54 ++++++++++++---------------------- 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/pyghmi/ipmi/private/session.py b/pyghmi/ipmi/private/session.py index 0c57b5be..50af7671 100644 --- a/pyghmi/ipmi/private/session.py +++ b/pyghmi/ipmi/private/session.py @@ -47,8 +47,6 @@ iothread = None # the thread in which all IO will be performed # the nature of things. iothreadready = False # whether io thread is yet ready to work iothreadwaiters = [] # threads waiting for iothreadready -ignoresockets = set() # between 'select' firing and 'recvfrom', a socket - # should be ignored ioqueue = collections.deque([]) selectbreak = None selectdeadline = 0 @@ -78,41 +76,29 @@ def define_worker(): iowaiters = [] timeout = 300 iothreadready = True + while iothreadwaiters: + waiter = iothreadwaiters.pop() + waiter.set() while self.running: - while iothreadwaiters: - waiter = iothreadwaiters.pop() - waiter.set() if timeout < 0: timeout = 0 selectdeadline = _monotonic_time() + timeout - if ignoresockets: - mysockets = [selectbreak[0]] - for pendingsocket in iosockets: - if pendingsocket not in ignoresockets: - mysockets.append(pendingsocket) - else: - mysockets = iosockets + [selectbreak[0]] + mysockets = iosockets + [selectbreak[0]] tmplist, _, _ = select.select(mysockets, (), (), timeout) # pessimistically move out the deadline # doing it this early (before ioqueue is evaluated) # this avoids other threads making a bad assumption # about not having to break into the select selectdeadline = _monotonic_time() + 300 - rdylist = [] - for handle in tmplist: - if handle is selectbreak[0]: - try: # flush all pending requests - while True: - os.read(handle, 1) - except OSError: - # this means an EWOULDBLOCK, ignore that as that - # was the endgame - pass - else: - ignoresockets.add(handle) - rdylist.append(handle) + _io_graball(iosockets) + try: # flush all pending requests + while True: + os.read(selectbreak[0], 1) + except OSError: + # this means an EWOULDBLOCK, ignore that as that + # was the endgame + pass for w in iowaiters: - w[2].append(tuple(rdylist)) w[3].set() iowaiters = [] timeout = 300 @@ -132,8 +118,7 @@ def define_worker(): traceback.print_exc() workitem[3].set() elif workitem[0] == 'wait': - if len(rdylist) > 0: - workitem[2].append(tuple(rdylist)) + if pktqueue: workitem[3].set() else: ltimeout = workitem[1] - _monotonic_time() @@ -154,7 +139,8 @@ def _io_apply(function, args): if not (function == 'wait' and selectdeadline < args): os.write(selectbreak[1], '1') evt.wait() - return result[0] + if result: + return result[0] def _io_sendto(mysocket, packet, sockaddr): @@ -178,7 +164,6 @@ def _io_graball(mysockets): def _io_recvfrom(mysocket, size): mysocket.setblocking(0) - ignoresockets.discard(mysocket) try: return mysocket.recvfrom(size) except socket.error: @@ -198,9 +183,10 @@ def _monotonic_time(): def _poller(timeout=0): - if ignoresockets: + if pktqueue: return True - return _io_apply('wait', timeout + _monotonic_time()) + _io_apply('wait', timeout + _monotonic_time()) + return pktqueue def _aespad(data): @@ -980,11 +966,9 @@ class Session(object): if timeout is None: return 0 if _poller(timeout=timeout): - _io_apply(_io_graball, (iosockets, )) - while len(pktqueue): + while pktqueue: (data, sockaddr, mysocket) = pktqueue.popleft() cls._route_ipmiresponse(sockaddr, data, mysocket) - _io_apply(_io_graball, (iosockets, )) sessionstodel = [] sessionstokeepalive = [] for session, parms in cls.keepalive_sessions.iteritems():