2
0
mirror of https://github.com/xcat2/confluent.git synced 2026-06-16 16:40:54 +00:00
Files
confluent/misc/prepfish.py
T
Jarrod Johnson fc1a16e77f Fix prepfish for XCC3
XCC3 does not do SSDP over the USB host interface.

Workaround by assuming a 'mac - 1' could work.
2026-06-15 10:23:26 -04:00

346 lines
12 KiB
Python

# Copyright 2017 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import ctypes
import fcntl
import json
from select import select
import glob
import hashlib
import socket
import struct
import os
import subprocess
import sys
import time
import ssl
import socket
class IpmiMsg(ctypes.Structure):
_fields_ = [('netfn', ctypes.c_ubyte),
('cmd', ctypes.c_ubyte),
('data_len', ctypes.c_short),
('data', ctypes.POINTER(ctypes.c_ubyte))]
class IpmiSystemInterfaceAddr(ctypes.Structure):
_fields_ = [('addr_type', ctypes.c_int),
('channel', ctypes.c_short),
('lun', ctypes.c_ubyte)]
class IpmiRecv(ctypes.Structure):
_fields_ = [('recv_type', ctypes.c_int),
('addr', ctypes.POINTER(IpmiSystemInterfaceAddr)),
('addr_len', ctypes.c_uint),
('msgid', ctypes.c_long),
('msg', IpmiMsg)]
class IpmiReq(ctypes.Structure):
_fields_ = [('addr', ctypes.POINTER(IpmiSystemInterfaceAddr)),
('addr_len', ctypes.c_uint),
('msgid', ctypes.c_long),
('msg', IpmiMsg)]
_IONONE = 0
_IOWRITE = 1
_IOREAD = 2
IPMICTL_SET_MY_ADDRESS_CMD = (
_IOREAD << 30 | ctypes.sizeof(ctypes.c_uint) << 16
| ord('i') << 8 | 17) # from ipmi.h
IPMICTL_SEND_COMMAND = (
_IOREAD << 30 | ctypes.sizeof(IpmiReq) << 16
| ord('i') << 8 | 13) # from ipmi.h
# next is really IPMICTL_RECEIVE_MSG_TRUNC, but will only use that
IPMICTL_RECV = (
(_IOWRITE | _IOREAD) << 30 | ctypes.sizeof(IpmiRecv) << 16
| ord('i') << 8 | 11) # from ipmi.h
BMC_SLAVE_ADDR = ctypes.c_uint(0x20)
CURRCHAN = 0xf
ADDRTYPE = 0xc
def get_nicname_from_dmi():
for fi in glob.glob('/sys/firmware/dmi/entries/42-*/raw'):
dmit = memoryview(open(fi, 'rb').read())
if dmit[0] != 42:
continue
if dmit[1] < 0xb:
continue
if dmit[4] != 0x40: # only supporting network host interface
continue
ifdatalen = dmit[5]
ifdata = dmit[6:6+ifdatalen]
if ifdata[0] != 2:
continue
idvend, idprod = struct.unpack('<HH', ifdata[1:5])
for nici in glob.glob('/sys/class/net/*'):
nicname = os.path.basename(nici)
try:
nicu = subprocess.check_output(['udevadm', 'info', nici], stderr=subprocess.DEVNULL)
except Exception:
raise
continue
nicu = nicu.decode()
if f'ID_VENDOR_ID={idvend:04x}' in nicu and f'ID_MODEL_ID={idprod:04x}' in nicu:
return nicname
return None
def scan_nicname(nicname):
idx = int(open('/sys/class/net/{}/ifindex'.format(nicname)).read())
return scan_nic(idx)
def scan_nic(nicidx):
iplinkinfo = subprocess.check_output(['ip', '-j', 'link'], stderr=subprocess.DEVNULL)
iplinkinfo = json.loads(iplinkinfo.decode())
lilkelylla = None
for link in iplinkinfo:
if link['ifindex'] == nicidx:
mac = link['address']
mac_int = int(mac.replace(':', ''), 16)
mac_int -= 1
mac_int ^= 0b10 << 40
machex = '{:012x}'.format(mac_int)
eui64 = machex[:6] + 'fffe' + machex[6:]
eui64 = ':'.join(eui64[i:i+4] for i in range(0, len(eui64), 4))
likelylla = 'fe80::{}%{}'.format(eui64, nicidx)
try:
with socket.create_connection((likelylla, 443), timeout=1):
return likelylla
except Exception:
pass
srvs = {}
s6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
s6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
s6.bind(('::', 0))
msg = b'M-SEARCH * HTTP/1.1\r\nHOST: [ff02::c]:1900\r\nMAN: "ssdp:discover"\r\nST: urn:dmtf-org:service:redfish-rest:1\r\nMX: 3\r\n\r\n'
s6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, nicidx)
x = [False,]
tries=5
while not x[0] and tries:
try:
s6.sendto(msg, ('ff02::c', 1900))
except Exception:
pass
if likelylla:
try:
with socket.create_connection((likelylla, 443), timeout=1):
return likelylla
except Exception:
pass
x = select((s6,), (), (), 2.0)
tries -= 1
if not x[0]:
raise Exception("Unable to find XCC via SSDP on {}".format(nicidx))
(rsp, peer) = s6.recvfrom(9000)
if '%' in peer[0]:
return peer[0]
else:
return '{}%{}'.format(peer[0], nicidx)
class Session(object):
def __init__(self, devnode='/dev/ipmi0'):
"""Create a local session inband
:param: devnode: The path to the ipmi device
"""
self.ipmidev = open(devnode, 'r+')
fcntl.ioctl(self.ipmidev, IPMICTL_SET_MY_ADDRESS_CMD, BMC_SLAVE_ADDR)
# the interface is initted, create some reusable memory for our session
self.databuffer = ctypes.create_string_buffer(4096)
self.req = IpmiReq()
self.rsp = IpmiRecv()
self.addr = IpmiSystemInterfaceAddr()
self.req.msg.data = ctypes.cast(
ctypes.addressof(self.databuffer),
ctypes.POINTER(ctypes.c_ubyte))
self.rsp.msg.data = self.req.msg.data
self.userid = None
self.password = None
def await_reply(self):
rd, _, _ = select((self.ipmidev,), (), (), 1)
while not rd:
rd, _, _ = select((self.ipmidev,), (), (), 1)
def pause(self, seconds):
time.sleep(seconds)
@property
def parsed_rsp(self):
response = {'netfn': self.rsp.msg.netfn, 'command': self.rsp.msg.cmd,
'code': bytearray(self.databuffer.raw)[0],
'data': bytearray(
self.databuffer.raw[1:self.rsp.msg.data_len])}
return response
def raw_command(self,
netfn,
command,
data=(),
bridge_request=None,
retry=True,
delay_xmit=None,
timeout=None,
waitall=False, rslun=0):
self.addr.channel = CURRCHAN
self.addr.addr_type = ADDRTYPE
self.addr.lun = rslun
self.req.addr_len = ctypes.sizeof(IpmiSystemInterfaceAddr)
self.req.addr = ctypes.pointer(self.addr)
self.req.msg.netfn = netfn
self.req.msg.cmd = command
if data:
data = memoryview(bytearray(data))
try:
self.databuffer[:len(data)] = data[:len(data)]
except ValueError:
self.databuffer[:len(data)] = data[:len(data)].tobytes()
self.req.msg.data_len = len(data)
fcntl.ioctl(self.ipmidev, IPMICTL_SEND_COMMAND, self.req)
self.await_reply()
self.rsp.msg.data_len = 4096
self.rsp.addr = ctypes.pointer(self.addr)
self.rsp.addr_len = ctypes.sizeof(IpmiSystemInterfaceAddr)
fcntl.ioctl(self.ipmidev, IPMICTL_RECV, self.rsp)
return self.parsed_rsp
class Verifier(object):
def __init__(self, fprint):
self._fprint = fprint
def validate(self, certificate):
return hashlib.sha256(certificate).digest() == self._fprint
def dotwait():
sys.stderr.write('.')
sys.stderr.flush()
time.sleep(0.5)
def disable_host_interface():
s = Session('/dev/ipmi0')
rsp = s.raw_command(netfn=0xc, command=1, data=(1, 0xc1, 0))
def get_redfish_creds():
os.makedirs('/run/redfish', exist_ok=True, mode=0o700)
try:
with open('/run/redfish/credentials', 'rb') as credin:
cred = credin.read()
except FileNotFoundError:
s = Session('/dev/ipmi0')
rsp = s.raw_command(netfn=0x2c, command=2, data=(0x52, 0xa5))
cred = bytes(rsp['data'])
with open('/run/redfish/credentials', 'wb') as credout:
credout.write(cred)
if cred[0] == 0x52:
cred = cred[1:]
creds = cred.split(b'\x00')[:2]
return creds
def get_redfish_fingerprint():
os.makedirs('/run/redfish', exist_ok=True, mode=0o700)
try:
with open('/run/redfish/fingerprint', 'rb') as certin:
fprint = certin.read()
except FileNotFoundError:
s = Session('/dev/ipmi0')
rsp = s.raw_command(0x2c, 1, data=(0x52, 1))
if rsp['data'][:2] == b'\x52\x01':
fprint = rsp['data'][2:]
with open('/run/redfish/fingerprint', 'wb') as printout:
printout.write(fprint)
return fprint
disableusb = False
def disable_host_interface():
s = Session('/dev/ipmi0')
s.raw_command(netfn=0xc, command=1, data=(1, 0xc1, 0))
def enable_host_interface():
global disableusb
loud = False
s = Session('/dev/ipmi0')
rsp = s.raw_command(netfn=0xc, command=2, data=(1, 0xc1, 0, 0))
usbenabled = rsp['data'][1] == 1
if not usbenabled:
disableusb = True
s.raw_command(netfn=0xc, command=1, data=(1, 0xc1, 1))
loud = True
sys.stderr.write("Enabling USB Interface")
rsp = s.raw_command(netfn=0xc, command=2, data=(1, 0xc1, 0, 0))
usbenabled = rsp['data'][1] == 1
while not usbenabled:
dotwait()
rsp = s.raw_command(netfn=0xc, command=2, data=(1, 0xc1, 0, 0))
usbenabled = rsp['data'][1] == 1
usbnic = get_nicname_from_dmi()
while not usbnic:
dotwait()
usbnic = get_nicname_from_dmi()
subprocess.check_call(['ip', 'link', 'set', usbnic, 'up'])
bmctarg = scan_nicname(usbnic)
while not bmctarg:
dotwait()
bmctarg = scan_nicname(usbnic)
if loud:
sys.stderr.write("USB NIC Established\n")
sys.stderr.flush()
return bmctarg
def store_redfish_cert(bmc):
fprint = get_redfish_fingerprint()
verifier = Verifier(fprint)
peercert = None
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((bmc, 443)) as plainsock:
finsock = context.wrap_socket(plainsock, server_hostname=bmc)
peercert = finsock.getpeercert(binary_form=True)
if not verifier.validate(peercert):
raise Exception("Mismatched certificate")
if peercert:
with open('/run/redfish/cert.der', 'wb') as certout:
certout.write(peercert)
def main():
get_redfish_fingerprint()
bmcuser, bmcpass = get_redfish_creds()
bmc = enable_host_interface()
store_redfish_cert(bmc)
if 'checkip' in sys.argv[0]:
nicinfo = subprocess.check_output(['curl', '-sku', '{0}:{1}'.format(bmcuser.decode(), bmcpass.decode()), 'https://[{}]/redfish/v1/Managers/1/EthernetInterfaces/NIC'.format(bmc)], stderr=subprocess.DEVNULL)
niconfo = json.loads(nicinfo.decode())
print(niconfo['IPv4Addresses'][0]['Address'])
if disableusb:
disable_host_interface()
else:
print('Redfish user: {}'.format(bmcuser.decode()))
print('Redfish password: {}'.format(bmcpass.decode()))
print('Redfish host: https://[{}]/'.format(bmc))
if __name__ == '__main__':
main()