2
0
mirror of https://opendev.org/x/pyghmi synced 2026-03-29 06:13:30 +00:00

Streamline and simplify IO Polling

Get more performance improvements by moving more of the
serialized effort into the IO thread to avoid churn.  This
also simplifies the issue with select being called without
recvfrom, allowing removal of the ignoresockets mechanism.
Also rework wait to avoid having to build lists that no one
ever consumes and move work out of the eternal loop
that only should happen at startup.  This has shaved an
additional 25% off of wallclock time in a single-processor
context for a given workload.

Change-Id: If321a69fabfb3ee55599ecfe3d24fbacd33388b5
This commit is contained in:
Jarrod Johnson
2015-02-18 10:15:09 -05:00
parent b779379511
commit f0d3050a79

View File

@@ -47,8 +47,6 @@ iothread = None # the thread in which all IO will be performed
# the nature of things.
iothreadready = False # whether io thread is yet ready to work
iothreadwaiters = [] # threads waiting for iothreadready
ignoresockets = set() # between 'select' firing and 'recvfrom', a socket
# should be ignored
ioqueue = collections.deque([])
selectbreak = None
selectdeadline = 0
@@ -78,41 +76,29 @@ def define_worker():
iowaiters = []
timeout = 300
iothreadready = True
while iothreadwaiters:
waiter = iothreadwaiters.pop()
waiter.set()
while self.running:
while iothreadwaiters:
waiter = iothreadwaiters.pop()
waiter.set()
if timeout < 0:
timeout = 0
selectdeadline = _monotonic_time() + timeout
if ignoresockets:
mysockets = [selectbreak[0]]
for pendingsocket in iosockets:
if pendingsocket not in ignoresockets:
mysockets.append(pendingsocket)
else:
mysockets = iosockets + [selectbreak[0]]
mysockets = iosockets + [selectbreak[0]]
tmplist, _, _ = select.select(mysockets, (), (), timeout)
# pessimistically move out the deadline
# doing it this early (before ioqueue is evaluated)
# this avoids other threads making a bad assumption
# about not having to break into the select
selectdeadline = _monotonic_time() + 300
rdylist = []
for handle in tmplist:
if handle is selectbreak[0]:
try: # flush all pending requests
while True:
os.read(handle, 1)
except OSError:
# this means an EWOULDBLOCK, ignore that as that
# was the endgame
pass
else:
ignoresockets.add(handle)
rdylist.append(handle)
_io_graball(iosockets)
try: # flush all pending requests
while True:
os.read(selectbreak[0], 1)
except OSError:
# this means an EWOULDBLOCK, ignore that as that
# was the endgame
pass
for w in iowaiters:
w[2].append(tuple(rdylist))
w[3].set()
iowaiters = []
timeout = 300
@@ -132,8 +118,7 @@ def define_worker():
traceback.print_exc()
workitem[3].set()
elif workitem[0] == 'wait':
if len(rdylist) > 0:
workitem[2].append(tuple(rdylist))
if pktqueue:
workitem[3].set()
else:
ltimeout = workitem[1] - _monotonic_time()
@@ -154,7 +139,8 @@ def _io_apply(function, args):
if not (function == 'wait' and selectdeadline < args):
os.write(selectbreak[1], '1')
evt.wait()
return result[0]
if result:
return result[0]
def _io_sendto(mysocket, packet, sockaddr):
@@ -178,7 +164,6 @@ def _io_graball(mysockets):
def _io_recvfrom(mysocket, size):
mysocket.setblocking(0)
ignoresockets.discard(mysocket)
try:
return mysocket.recvfrom(size)
except socket.error:
@@ -198,9 +183,10 @@ def _monotonic_time():
def _poller(timeout=0):
if ignoresockets:
if pktqueue:
return True
return _io_apply('wait', timeout + _monotonic_time())
_io_apply('wait', timeout + _monotonic_time())
return pktqueue
def _aespad(data):
@@ -980,11 +966,9 @@ class Session(object):
if timeout is None:
return 0
if _poller(timeout=timeout):
_io_apply(_io_graball, (iosockets, ))
while len(pktqueue):
while pktqueue:
(data, sockaddr, mysocket) = pktqueue.popleft()
cls._route_ipmiresponse(sockaddr, data, mysocket)
_io_apply(_io_graball, (iosockets, ))
sessionstodel = []
sessionstokeepalive = []
for session, parms in cls.keepalive_sessions.iteritems():