diff --git a/confluent_client/bin/nodeapply b/confluent_client/bin/nodeapply index 2e798742..bf4b9a53 100755 --- a/confluent_client/bin/nodeapply +++ b/confluent_client/bin/nodeapply @@ -68,6 +68,7 @@ def run(): currprocs = 0 all = set([]) + poller = select.epoll() pipedesc = {} pendingexecs = deque() exitcode = 0 @@ -102,19 +103,23 @@ def run(): cmdv = ['ssh', sshnode] + cmdvbase + cmdstorun[0] if currprocs < concurrentprocs: currprocs += 1 - run_cmdv(sshnode, cmdv, all, pipedesc) + run_cmdv(sshnode, cmdv, all, poller, pipedesc) else: pendingexecs.append((sshnode, cmdv)) if not all or exitcode: sys.exit(exitcode) - rdy, _, _ = select.select(all, [], [], 10) + rdy = poller.poll(10) while all: pernodeout = {} for r in rdy: + r = r[0] desc = pipedesc[r] + r = desc['file'] node = desc['node'] data = True - while data and select.select([r], [], [], 0)[0]: + singlepoller = select.epoll() + singlepoller.register(r, select.EPOLLIN) + while data and singlepoller.poll(0): data = r.readline() if data: if desc['type'] == 'stdout': @@ -131,15 +136,17 @@ def run(): if ret is not None: exitcode = exitcode | ret all.discard(r) + poller.unregister(r) r.close() if desc['type'] == 'stdout': if idxbynode[node] < len(cmdstorun): cmdv = ['ssh', sshnode] + cmdvbase + cmdstorun[idxbynode[node]] idxbynode[node] += 1 - run_cmdv(node, cmdv, all, pipedesc) + run_cmdv(node, cmdv, all, poller, pipedesc) elif pendingexecs: node, cmdv = pendingexecs.popleft() - run_cmdv(node, cmdv, all, pipedesc) + run_cmdv(node, cmdv, all, poller. pipedesc) + singlepoller.close() for node in sortutil.natural_sort(pernodeout): for line in pernodeout[node]: line = client.stringify(line) @@ -147,19 +154,21 @@ def run(): sys.stdout.write('{0}: {1}'.format(node, line)) sys.stdout.flush() if all: - rdy, _, _ = select.select(all, [], [], 10) + rdy = poller.poll(10) sys.exit(exitcode) -def run_cmdv(node, cmdv, all, pipedesc): +def run_cmdv(node, cmdv, all, poller, pipedesc): nopen = subprocess.Popen( cmdv, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - pipedesc[nopen.stdout] = {'node': node, 'popen': nopen, - 'type': 'stdout'} - pipedesc[nopen.stderr] = {'node': node, 'popen': nopen, - 'type': 'stderr'} + pipedesc[nopen.stdout.fileno()] = {'node': node, 'popen': nopen, + 'type': 'stdout', 'file': nopen.stdout} + pipedesc[nopen.stderr.fileno()] = {'node': node, 'popen': nopen, + 'type': 'stderr', 'file': nopen.stderr} all.add(nopen.stdout) + poller.register(nopen.stdout, select.EPOLLIN) all.add(nopen.stderr) + poller.register(nopen.stderr, select.EPOLLIN) if __name__ == '__main__': diff --git a/confluent_client/bin/nodeping b/confluent_client/bin/nodeping index 1140a6bd..25d60c81 100755 --- a/confluent_client/bin/nodeping +++ b/confluent_client/bin/nodeping @@ -58,6 +58,7 @@ def run(): currprocs = 0 all = set([]) + poller = select.epoll() pipedesc = {} pendingexecs = deque() exitcode = 0 @@ -86,21 +87,28 @@ def run(): if currprocs < concurrentprocs: currprocs += 1 if options.origname: - run_cmdv(node, cmdv, all, pipedesc) + run_cmdv(node, cmdv, all, poller, pipedesc) else: - run_cmdv(pingnode, cmdv, all, pipedesc) + run_cmdv(pingnode, cmdv, all, poller, pipedesc) else: - pendingexecs.append((pingnode, cmdv)) + if options.origname: + pendingexecs.append((node, cmdv)) + else: + pendingexecs.append((pingnode, cmdv)) if not all or exitcode: sys.exit(exitcode) - rdy, _, _ = select.select(all, [], [], 10) + rdy = poller.poll(10) while all: pernodeout = {} for r in rdy: + r = r[0] desc = pipedesc[r] + r = desc['file'] node = desc['node'] data = True - while data and select.select([r], [], [], 0)[0]: + singlepoller = select.epoll() + singlepoller.register(r, select.EPOLLIN) + while data and singlepoller.poll(0): data = r.readline() if not data: pop = desc['popen'] @@ -108,6 +116,7 @@ def run(): if ret is not None: exitcode = exitcode | ret all.discard(r) + poller.unregister(r) r.close() if desc['type'] == 'stdout': if ret: @@ -116,7 +125,8 @@ def run(): print('{0}: ping'.format(node)) if pendingexecs: node, cmdv = pendingexecs.popleft() - run_cmdv(node, cmdv, all, pipedesc) + run_cmdv(node, cmdv, all, poller, pipedesc) + singlepoller.close() for node in sortutil.natural_sort(pernodeout): for line in pernodeout[node]: line = client.stringify(line) @@ -126,19 +136,21 @@ def run(): sys.stdout.write('{0}: {1}'.format(node, line)) sys.stdout.flush() if all: - rdy, _, _ = select.select(all, [], [], 10) + rdy = poller.poll(10) sys.exit(exitcode) -def run_cmdv(node, cmdv, all, pipedesc): +def run_cmdv(node, cmdv, all, poller, pipedesc): nopen = subprocess.Popen( cmdv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - pipedesc[nopen.stdout] = {'node': node, 'popen': nopen, - 'type': 'stdout'} - pipedesc[nopen.stderr] = {'node': node, 'popen': nopen, - 'type': 'stderr'} + pipedesc[nopen.stdout.fileno()] = {'node': node, 'popen': nopen, + 'type': 'stdout', 'file': nopen.stdout} + pipedesc[nopen.stderr.fileno()] = {'node': node, 'popen': nopen, + 'type': 'stderr', 'file': nopen.stderr} all.add(nopen.stdout) + poller.register(nopen.stdout, select.EPOLLIN) all.add(nopen.stderr) + poller.register(nopen.stderr, select.EPOLLIN) if __name__ == '__main__': diff --git a/confluent_client/bin/noderun b/confluent_client/bin/noderun index df0d4a77..c3367394 100755 --- a/confluent_client/bin/noderun +++ b/confluent_client/bin/noderun @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2016-2017 Lenovo @@ -67,6 +67,7 @@ def run(): currprocs = 0 all = set([]) + poller = select.epoll() pipedesc = {} pendingexecs = deque() exitcode = 0 @@ -84,19 +85,23 @@ def run(): cmdv = shlex.split(cmd) if currprocs < concurrentprocs: currprocs += 1 - run_cmdv(node, cmdv, all, pipedesc) + run_cmdv(node, cmdv, all, poller, pipedesc) else: pendingexecs.append((node, cmdv)) if not all or exitcode: sys.exit(exitcode) - rdy, _, _ = select.select(all, [], [], 10) + rdy = poller.poll(10) while all: pernodeout = {} for r in rdy: + r = r[0] desc = pipedesc[r] + r = desc['file'] node = desc['node'] data = True - while data and select.select([r], [], [], 0)[0]: + singlepoller = select.epoll() + singlepoller.register(r, select.EPOLLIN) + while data and singlepoller.poll(0): data = r.readline() if data: if desc['type'] == 'stdout': @@ -116,10 +121,12 @@ def run(): if ret is not None: exitcode = exitcode | ret all.discard(r) + poller.unregister(r) r.close() if desc['type'] == 'stdout' and pendingexecs: node, cmdv = pendingexecs.popleft() - run_cmdv(node, cmdv, all, pipedesc) + run_cmdv(node, cmdv, all, poller, pipedesc) + singlepoller.close() for node in sortutil.natural_sort(pernodeout): for line in pernodeout[node]: line = client.stringify(line) @@ -129,11 +136,11 @@ def run(): sys.stdout.write('{0}: {1}'.format(node, line)) sys.stdout.flush() if all: - rdy, _, _ = select.select(all, [], [], 10) + rdy = poller.poll(10) sys.exit(exitcode) -def run_cmdv(node, cmdv, all, pipedesc): +def run_cmdv(node, cmdv, all, poller, pipedesc): try: nopen = subprocess.Popen( cmdv, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -142,13 +149,14 @@ def run_cmdv(node, cmdv, all, pipedesc): sys.stderr.write('{0}: Unable to find local executable file "{1}"'.format(node, cmdv[0])) return raise - pipedesc[nopen.stdout] = {'node': node, 'popen': nopen, - 'type': 'stdout'} - pipedesc[nopen.stderr] = {'node': node, 'popen': nopen, - 'type': 'stderr'} + pipedesc[nopen.stdout.fileno()] = {'node': node, 'popen': nopen, + 'type': 'stdout', 'file': nopen.stdout} + pipedesc[nopen.stderr.fileno()] = {'node': node, 'popen': nopen, + 'type': 'stderr', 'file': nopen.stderr} all.add(nopen.stdout) + poller.register(nopen.stdout, select.EPOLLIN) all.add(nopen.stderr) - + poller.register(nopen.stderr, select.EPOLLIN) if __name__ == '__main__': run() diff --git a/confluent_client/bin/nodeshell b/confluent_client/bin/nodeshell index f22c1993..90ab89d9 100755 --- a/confluent_client/bin/nodeshell +++ b/confluent_client/bin/nodeshell @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2016-2017 Lenovo @@ -109,6 +109,7 @@ def run(): ex = exp.get('databynode', ()) for node in ex: cmdparms.append((node, ex[node]['value'])) + poller = select.epoll() for node, cmd in cmdparms: sshnode = nodemap.get(node, node) if not isinstance(cmd, str) and not isinstance(cmd, bytes): @@ -121,19 +122,23 @@ def run(): cmdv += [sshnode, cmd] if currprocs < concurrentprocs: currprocs += 1 - run_cmdv(node, cmdv, all, pipedesc) + run_cmdv(node, cmdv, all, poller, pipedesc) else: pendingexecs.append((node, cmdv)) if not all or exitcode: sys.exit(exitcode) - rdy, _, _ = select.select(all, [], [], 10) + rdy = poller.poll(10) while all: pernodeout = {} for r in rdy: + r = r[0] desc = pipedesc[r] + r = desc['file'] node = desc['node'] data = True - while data and select.select([r], [], [], 0)[0]: + singlepoller = select.epoll() + singlepoller.register(r, select.EPOLLIN) + while data and singlepoller.poll(0): data = r.readline() if data: if desc['type'] == 'stdout': @@ -153,10 +158,12 @@ def run(): if ret is not None: exitcode = exitcode | ret all.discard(r) + poller.unregister(r) r.close() if desc['type'] == 'stdout' and pendingexecs: node, cmdv = pendingexecs.popleft() - run_cmdv(node, cmdv, all, pipedesc) + run_cmdv(node, cmdv, all, poller, pipedesc) + singlepoller.close() for node in sortutil.natural_sort(pernodeout): for line in pernodeout[node]: line = client.stringify(line) @@ -167,19 +174,21 @@ def run(): sys.stdout.write('{0}: {1}'.format(node, line)) sys.stdout.flush() if all: - rdy, _, _ = select.select(all, [], [], 10) + rdy = poller.poll(10) sys.exit(exitcode) -def run_cmdv(node, cmdv, all, pipedesc): +def run_cmdv(node, cmdv, all, poller, pipedesc): nopen = subprocess.Popen( cmdv, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - pipedesc[nopen.stdout] = {'node': node, 'popen': nopen, - 'type': 'stdout'} - pipedesc[nopen.stderr] = {'node': node, 'popen': nopen, - 'type': 'stderr'} + pipedesc[nopen.stdout.fileno()] = {'node': node, 'popen': nopen, + 'type': 'stdout', 'file': nopen.stdout} + pipedesc[nopen.stderr.fileno()] = {'node': node, 'popen': nopen, + 'type': 'stderr', 'file': nopen.stderr} all.add(nopen.stdout) all.add(nopen.stderr) + poller.register(nopen.stdout, select.EPOLLIN) + poller.register(nopen.stderr, select.EPOLLIN) if __name__ == '__main__':