2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-06-18 17:40:45 +00:00

Switch several node commands to epoll

Improve ability to handle high fanout amounts by
using epoll instead of select.
This commit is contained in:
Jarrod Johnson
2025-05-20 10:39:57 -04:00
parent 11939c4d57
commit 6bebae1d0b
4 changed files with 84 additions and 46 deletions
+20 -11
View File
@@ -68,6 +68,7 @@ def run():
currprocs = 0
all = set([])
poller = select.epoll()
pipedesc = {}
pendingexecs = deque()
exitcode = 0
@@ -102,19 +103,23 @@ def run():
cmdv = ['ssh', sshnode] + cmdvbase + cmdstorun[0]
if currprocs < concurrentprocs:
currprocs += 1
run_cmdv(sshnode, cmdv, all, pipedesc)
run_cmdv(sshnode, cmdv, all, poller, pipedesc)
else:
pendingexecs.append((sshnode, cmdv))
if not all or exitcode:
sys.exit(exitcode)
rdy, _, _ = select.select(all, [], [], 10)
rdy = poller.poll(10)
while all:
pernodeout = {}
for r in rdy:
r = r[0]
desc = pipedesc[r]
r = desc['file']
node = desc['node']
data = True
while data and select.select([r], [], [], 0)[0]:
singlepoller = select.epoll()
singlepoller.register(r, select.EPOLLIN)
while data and singlepoller.poll(0):
data = r.readline()
if data:
if desc['type'] == 'stdout':
@@ -131,15 +136,17 @@ def run():
if ret is not None:
exitcode = exitcode | ret
all.discard(r)
poller.unregister(r)
r.close()
if desc['type'] == 'stdout':
if idxbynode[node] < len(cmdstorun):
cmdv = ['ssh', sshnode] + cmdvbase + cmdstorun[idxbynode[node]]
idxbynode[node] += 1
run_cmdv(node, cmdv, all, pipedesc)
run_cmdv(node, cmdv, all, poller, pipedesc)
elif pendingexecs:
node, cmdv = pendingexecs.popleft()
run_cmdv(node, cmdv, all, pipedesc)
run_cmdv(node, cmdv, all, poller. pipedesc)
singlepoller.close()
for node in sortutil.natural_sort(pernodeout):
for line in pernodeout[node]:
line = client.stringify(line)
@@ -147,19 +154,21 @@ def run():
sys.stdout.write('{0}: {1}'.format(node, line))
sys.stdout.flush()
if all:
rdy, _, _ = select.select(all, [], [], 10)
rdy = poller.poll(10)
sys.exit(exitcode)
def run_cmdv(node, cmdv, all, pipedesc):
def run_cmdv(node, cmdv, all, poller, pipedesc):
nopen = subprocess.Popen(
cmdv, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pipedesc[nopen.stdout] = {'node': node, 'popen': nopen,
'type': 'stdout'}
pipedesc[nopen.stderr] = {'node': node, 'popen': nopen,
'type': 'stderr'}
pipedesc[nopen.stdout.fileno()] = {'node': node, 'popen': nopen,
'type': 'stdout', 'file': nopen.stdout}
pipedesc[nopen.stderr.fileno()] = {'node': node, 'popen': nopen,
'type': 'stderr', 'file': nopen.stderr}
all.add(nopen.stdout)
poller.register(nopen.stdout, select.EPOLLIN)
all.add(nopen.stderr)
poller.register(nopen.stderr, select.EPOLLIN)
if __name__ == '__main__':
+24 -12
View File
@@ -58,6 +58,7 @@ def run():
currprocs = 0
all = set([])
poller = select.epoll()
pipedesc = {}
pendingexecs = deque()
exitcode = 0
@@ -86,21 +87,28 @@ def run():
if currprocs < concurrentprocs:
currprocs += 1
if options.origname:
run_cmdv(node, cmdv, all, pipedesc)
run_cmdv(node, cmdv, all, poller, pipedesc)
else:
run_cmdv(pingnode, cmdv, all, pipedesc)
run_cmdv(pingnode, cmdv, all, poller, pipedesc)
else:
pendingexecs.append((pingnode, cmdv))
if options.origname:
pendingexecs.append((node, cmdv))
else:
pendingexecs.append((pingnode, cmdv))
if not all or exitcode:
sys.exit(exitcode)
rdy, _, _ = select.select(all, [], [], 10)
rdy = poller.poll(10)
while all:
pernodeout = {}
for r in rdy:
r = r[0]
desc = pipedesc[r]
r = desc['file']
node = desc['node']
data = True
while data and select.select([r], [], [], 0)[0]:
singlepoller = select.epoll()
singlepoller.register(r, select.EPOLLIN)
while data and singlepoller.poll(0):
data = r.readline()
if not data:
pop = desc['popen']
@@ -108,6 +116,7 @@ def run():
if ret is not None:
exitcode = exitcode | ret
all.discard(r)
poller.unregister(r)
r.close()
if desc['type'] == 'stdout':
if ret:
@@ -116,7 +125,8 @@ def run():
print('{0}: ping'.format(node))
if pendingexecs:
node, cmdv = pendingexecs.popleft()
run_cmdv(node, cmdv, all, pipedesc)
run_cmdv(node, cmdv, all, poller, pipedesc)
singlepoller.close()
for node in sortutil.natural_sort(pernodeout):
for line in pernodeout[node]:
line = client.stringify(line)
@@ -126,19 +136,21 @@ def run():
sys.stdout.write('{0}: {1}'.format(node, line))
sys.stdout.flush()
if all:
rdy, _, _ = select.select(all, [], [], 10)
rdy = poller.poll(10)
sys.exit(exitcode)
def run_cmdv(node, cmdv, all, pipedesc):
def run_cmdv(node, cmdv, all, poller, pipedesc):
nopen = subprocess.Popen(
cmdv, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pipedesc[nopen.stdout] = {'node': node, 'popen': nopen,
'type': 'stdout'}
pipedesc[nopen.stderr] = {'node': node, 'popen': nopen,
'type': 'stderr'}
pipedesc[nopen.stdout.fileno()] = {'node': node, 'popen': nopen,
'type': 'stdout', 'file': nopen.stdout}
pipedesc[nopen.stderr.fileno()] = {'node': node, 'popen': nopen,
'type': 'stderr', 'file': nopen.stderr}
all.add(nopen.stdout)
poller.register(nopen.stdout, select.EPOLLIN)
all.add(nopen.stderr)
poller.register(nopen.stderr, select.EPOLLIN)
if __name__ == '__main__':
+20 -12
View File
@@ -1,4 +1,4 @@
#!/usr/bin/python2
#!/usr/bin/python3
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2016-2017 Lenovo
@@ -67,6 +67,7 @@ def run():
currprocs = 0
all = set([])
poller = select.epoll()
pipedesc = {}
pendingexecs = deque()
exitcode = 0
@@ -84,19 +85,23 @@ def run():
cmdv = shlex.split(cmd)
if currprocs < concurrentprocs:
currprocs += 1
run_cmdv(node, cmdv, all, pipedesc)
run_cmdv(node, cmdv, all, poller, pipedesc)
else:
pendingexecs.append((node, cmdv))
if not all or exitcode:
sys.exit(exitcode)
rdy, _, _ = select.select(all, [], [], 10)
rdy = poller.poll(10)
while all:
pernodeout = {}
for r in rdy:
r = r[0]
desc = pipedesc[r]
r = desc['file']
node = desc['node']
data = True
while data and select.select([r], [], [], 0)[0]:
singlepoller = select.epoll()
singlepoller.register(r, select.EPOLLIN)
while data and singlepoller.poll(0):
data = r.readline()
if data:
if desc['type'] == 'stdout':
@@ -116,10 +121,12 @@ def run():
if ret is not None:
exitcode = exitcode | ret
all.discard(r)
poller.unregister(r)
r.close()
if desc['type'] == 'stdout' and pendingexecs:
node, cmdv = pendingexecs.popleft()
run_cmdv(node, cmdv, all, pipedesc)
run_cmdv(node, cmdv, all, poller, pipedesc)
singlepoller.close()
for node in sortutil.natural_sort(pernodeout):
for line in pernodeout[node]:
line = client.stringify(line)
@@ -129,11 +136,11 @@ def run():
sys.stdout.write('{0}: {1}'.format(node, line))
sys.stdout.flush()
if all:
rdy, _, _ = select.select(all, [], [], 10)
rdy = poller.poll(10)
sys.exit(exitcode)
def run_cmdv(node, cmdv, all, pipedesc):
def run_cmdv(node, cmdv, all, poller, pipedesc):
try:
nopen = subprocess.Popen(
cmdv, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -142,13 +149,14 @@ def run_cmdv(node, cmdv, all, pipedesc):
sys.stderr.write('{0}: Unable to find local executable file "{1}"'.format(node, cmdv[0]))
return
raise
pipedesc[nopen.stdout] = {'node': node, 'popen': nopen,
'type': 'stdout'}
pipedesc[nopen.stderr] = {'node': node, 'popen': nopen,
'type': 'stderr'}
pipedesc[nopen.stdout.fileno()] = {'node': node, 'popen': nopen,
'type': 'stdout', 'file': nopen.stdout}
pipedesc[nopen.stderr.fileno()] = {'node': node, 'popen': nopen,
'type': 'stderr', 'file': nopen.stderr}
all.add(nopen.stdout)
poller.register(nopen.stdout, select.EPOLLIN)
all.add(nopen.stderr)
poller.register(nopen.stderr, select.EPOLLIN)
if __name__ == '__main__':
run()
+20 -11
View File
@@ -1,4 +1,4 @@
#!/usr/bin/python2
#!/usr/bin/python3
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2016-2017 Lenovo
@@ -109,6 +109,7 @@ def run():
ex = exp.get('databynode', ())
for node in ex:
cmdparms.append((node, ex[node]['value']))
poller = select.epoll()
for node, cmd in cmdparms:
sshnode = nodemap.get(node, node)
if not isinstance(cmd, str) and not isinstance(cmd, bytes):
@@ -121,19 +122,23 @@ def run():
cmdv += [sshnode, cmd]
if currprocs < concurrentprocs:
currprocs += 1
run_cmdv(node, cmdv, all, pipedesc)
run_cmdv(node, cmdv, all, poller, pipedesc)
else:
pendingexecs.append((node, cmdv))
if not all or exitcode:
sys.exit(exitcode)
rdy, _, _ = select.select(all, [], [], 10)
rdy = poller.poll(10)
while all:
pernodeout = {}
for r in rdy:
r = r[0]
desc = pipedesc[r]
r = desc['file']
node = desc['node']
data = True
while data and select.select([r], [], [], 0)[0]:
singlepoller = select.epoll()
singlepoller.register(r, select.EPOLLIN)
while data and singlepoller.poll(0):
data = r.readline()
if data:
if desc['type'] == 'stdout':
@@ -153,10 +158,12 @@ def run():
if ret is not None:
exitcode = exitcode | ret
all.discard(r)
poller.unregister(r)
r.close()
if desc['type'] == 'stdout' and pendingexecs:
node, cmdv = pendingexecs.popleft()
run_cmdv(node, cmdv, all, pipedesc)
run_cmdv(node, cmdv, all, poller, pipedesc)
singlepoller.close()
for node in sortutil.natural_sort(pernodeout):
for line in pernodeout[node]:
line = client.stringify(line)
@@ -167,19 +174,21 @@ def run():
sys.stdout.write('{0}: {1}'.format(node, line))
sys.stdout.flush()
if all:
rdy, _, _ = select.select(all, [], [], 10)
rdy = poller.poll(10)
sys.exit(exitcode)
def run_cmdv(node, cmdv, all, pipedesc):
def run_cmdv(node, cmdv, all, poller, pipedesc):
nopen = subprocess.Popen(
cmdv, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pipedesc[nopen.stdout] = {'node': node, 'popen': nopen,
'type': 'stdout'}
pipedesc[nopen.stderr] = {'node': node, 'popen': nopen,
'type': 'stderr'}
pipedesc[nopen.stdout.fileno()] = {'node': node, 'popen': nopen,
'type': 'stdout', 'file': nopen.stdout}
pipedesc[nopen.stderr.fileno()] = {'node': node, 'popen': nopen,
'type': 'stderr', 'file': nopen.stderr}
all.add(nopen.stdout)
all.add(nopen.stderr)
poller.register(nopen.stdout, select.EPOLLIN)
poller.register(nopen.stderr, select.EPOLLIN)
if __name__ == '__main__':