文章目录
- 1.编译最新的u-boot 2019.10,linux-kernel 5.3.6并用busybox打包根文件系统:成功在全志H5芯片上启动起来
- 2.读写光模块瓦数和温度:SFP类型的光模块控制:支持对TX Fault,MOD_ABS,RX_LOS,TX Disable信号的实时电平读取;支持对TX Disable信号的电平控制
- 3. tlv_tool.py:\xf0是二进制数据
1.编译最新的u-boot 2019.10,linux-kernel 5.3.6并用busybox打包根文件系统:成功在全志H5芯片上启动起来
1.1 u-boot:…_defconfig文件生成.config文件再make编译
如下是gcc用来编译uboot。
nano .bashrc
执行如上后,把SD卡从读卡器拔下插到派上。
1.2 kernel:先make config再make生成文件拷进去
如下.dtb就是设备树文件,是编译出来的,.dts是原文件。
1.3 busybox:先make config再make,make install,创建文件
如上创建了文件夹,把所有东西打包进来。
如上拔出SD卡,插入到派上。
2.读写光模块瓦数和温度:SFP类型的光模块控制:支持对TX Fault,MOD_ABS,RX_LOS,TX Disable信号的实时电平读取;支持对TX Disable信号的电平控制
# !/usr/bin/env python
# coding:UTF-8
# sfp.py
import sys
import datetime
import time
import math
import json
import re
import os
from os.path import dirname, abspath
from subprocess import PIPE, Popen
SFP_TYPE_CODE_LIST = [
'03', # SFP/SFP+/SFP28
'1b'
]
QSFP_TYPE_CODE_LIST = [
'0d', # QSFP+ or later
'11' # QSFP28 or later
]
QSFP_DD_TYPE_CODE_LIST = [
'18', # QSFP-DD Double Density 8X Pluggable Transceiver
'1e'
]
SFP_TYPE = "SFP"
QSFP_TYPE = "QSFP"
QSFP_DD_TYPE = "QSFP_DD"
sfp_type = None
operate = None
#Project_path = dirname(dirname(dirname(abspath(__file__))))
#Conf_path = "%s/cfg_sfp.json"%Project_path
Conf_path = "./cfg_sfp.json"
# " QSFP 1.5W,2.0W,2.5W,3.0W,3.5W,4.0W,4.5W,5.0W,5.5W,6.0W\n"
# " QSFP-DD 1.5W,3.5W,5.0W,7.0W,10.0W,12.0W,14.0W,16.0W\n"
QSFP_W_dict = {
1.5 : ["0x00","0x00"],
2.0 : ["0x01","0x00"],
2.5 : ["0x02","0x00"],
3.0 : ["0x04","0x00"],
3.5 : ["0x08","0x00"],
4.0 : ["0x10","0x00"],
4.5 : ["0x20","0x00"],
5.0 : ["0x40","0x00"],
5.5 : ["0x80","0x00"],
6.0 : ["0x80","0x08"],
}
QSFPDD_W_dict = {
1.5 : 0,
3.5 : 1,
5.0 : 2,
7.0 : 3,
10.0 : 4,
12.0 : 5,
14.0 : 6,
16.0 : 7,
}
SFP_W_dict = {
0.0 : 0,
0.5 : 1,
1.0 : 2,
1.5 : 3,
2.0 : 4,
2.5 : 5,
3.0 : 6,
3.5 : 7,
4.0 : 8,
4.5 : 9,
5.0 : 10,
5.5 : 11,
6.0 : 12,
6.5 : 13,
7.0 : 14
}
def ParseJson(filename=Conf_path):
with open(filename, 'r') as f:
jdata = json.load(f)
return jdata
def Run(cmd, retype="r"):
'''Run System Command and Return Command Stdout Object'''
try:
with Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE, encoding="utf-8") as f:
Ret_Type = {"r": f.stdout.read, "rl": f.stdout.readline, "rls" : f.stdout.readlines, "rc": f.wait}
if retype == 're':
return f.stdout.read() + f.stderr.read()
return Ret_Type[retype]()
except Exception as e:
print("\033[31mExecute Err:%s\033[0m"%e)
def _read_eeprom_specific_bytes(): #读eeprom判断是什么类型SFP
global sfp_type
sysfsfile_eeprom = None
eeprom_raw = []
eeprom_list1 = []
port_str = ParseJson()["current_port"]
port_conf = ParseJson()[port_str]
port = int(sys.argv[2])
i2c_bus_offset = int(port_conf["i2c_bus_offset"])
i2c_bus = port + i2c_bus_offset
for p in ParseJson()["sfp_type"]:
i2c_addr = ParseJson()[p]["i2c_addr"]
device_node = str(i2c_bus) + "-" + i2c_addr[2:].zfill(4)
eeprom_path = "/sys/bus/i2c/devices/" + device_node + "/eeprom"
eeprom_list1.append(eeprom_path)
eeprom_list = list(set(eeprom_list1))
for i in range(0, 50):
eeprom_raw.append("0x00")
print(eeprom_list)
# eeprom_path = "/sys/bus/i2c/devices/67-0050/eeprom"
for eeprom_path in eeprom_list:
try:
sysfsfile_eeprom = open(eeprom_path, mode="rb", buffering=0)
sysfsfile_eeprom.seek(0)
raw = sysfsfile_eeprom.read(5)
for n in range(0, 5):
#eeprom_raw[n] = hex(ord(raw[n]))[2:].zfill(2)
eeprom_raw[n] = raw[n]
# print("data = %x"%eeprom_raw[n])
a = hex(raw[0])[2:]
print("first data = %s" %(a))
if a in QSFP_TYPE_CODE_LIST:
sfp_type = QSFP_TYPE
elif a in SFP_TYPE_CODE_LIST:
sfp_type = SFP_TYPE
elif a in QSFP_DD_TYPE_CODE_LIST:
sfp_type = QSFP_DD_TYPE
except Exception as e:
print(e)
finally:
if sysfsfile_eeprom:
sysfsfile_eeprom.close()
return eeprom_raw
def wattage_operation():
global sfp_type
global operate
conf = ParseJson()
port = int(sys.argv[2]) # python3 sfp.py get 1(哪个端口)
port_str = ParseJson()["current_port"]
port_conf = ParseJson()[port_str]
i2c_bus_offset = int(port_conf["i2c_bus_offset"])
QSFP_wattage_reg = conf["qsfp"]["wattage_reg"]
SFP_wattage_reg = conf["sfp"]["wattage_reg"]
QSFPDD_wattage_reg = conf["qsfp_dd"]["wattage_reg"]
if sfp_type == QSFP_TYPE:
i2c_addr = conf["qsfp"]["i2c_addr"]
print("This SFP type: %s " %QSFP_TYPE)
elif sfp_type == SFP_TYPE:
i2c_addr = conf["sfp"]["i2c_addr"]
print("This SFP type: %s " %SFP_TYPE)
elif sfp_type == QSFP_DD_TYPE:
i2c_addr = conf["qsfp_dd"]["i2c_addr"]
print("This SFP type: %s " %QSFP_DD_TYPE)
else:
print("This SFP type is not found")
i2c_bus = port + i2c_bus_offset
if operate == 'read':
if sfp_type == QSFP_TYPE:
reg_data = []
for i in QSFP_wattage_reg:
cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i
print(cmd)
output = Run(cmd)
print("output=%s" %output)
reg_data.append(output.replace('\n',''))
for k,val in QSFP_W_dict.items():
if val == reg_data:
print("This QSFP module wattage : %0.1fW" %k)
temper = conf["qsfp"]["temper_reg"]
cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[0]
output1 = Run(cmd)
#print(output1)
cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[1]
output2 = Run(cmd)
#print(output2)
temp = (int(output1.replace('\n',''),16) << 8) + int(output2.replace('\n',''),16)
#print("temp=%d" %temp)
print("Temperature of loopback module : %0.1fC" %(temp/256))
elif sfp_type == QSFP_DD_TYPE:
for i in QSFPDD_wattage_reg:
cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i
print(cmd)
output = Run(cmd)
print("output=%s" %output)
reg_dd = (int(output.replace('\n',''),16) >> 5)
for k,val in QSFPDD_W_dict.items():
if val == reg_dd:
print("This QSFP-DD module wattage : %0.1fW" %k)
temper = conf["qsfp_dd"]["temper_reg"]
cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[0]
output1 = Run(cmd)
#print(output1)
cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[1]
output2 = Run(cmd)
#print(output2)
temp = (int(output1.replace('\n',''),16) << 8) + int(output2.replace('\n',''),16)
#print("temp=%d" %temp)
print("Temperature of loopback module : %0.1fC" %(temp/256))
elif sfp_type == SFP_TYPE:
for i in SFP_wattage_reg:
cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i
print(cmd)
output = Run(cmd)
print("output=%s" %output)
reg_sfp = int(output.replace('\n',''),16)
for k,val in SFP_W_dict.items():
if val == reg_sfp:
print("This SFP module wattage : %0.1fW" %k)
temper = conf["sfp"]["temper_reg"]
cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[0]
output1 = Run(cmd)
#print(output1)
cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[1]
output2 = Run(cmd)
#print(output2)
temp = (int(output1.replace('\n',''),16) << 8) + int(output2.replace('\n',''),16)
#print("temp=%d" %temp)
print("Temperature of loopback module : %0.1fC" %(temp/256))
elif operate == 'write':
wattage_str = sys.argv[3]
if wattage_str[-1] == 'w' or wattage_str[-1] == 'W':
wattage_str = wattage_str[:-1]
pattern = re.compile(r'^[-+]?[-0-9]\d*\.\d*|[-+]?\.?[0-9]\d*$')
result = pattern.match(wattage_str)
wattage = float(wattage_str)
if result:
pass
else:
show_help()
val = []
if sfp_type == QSFP_TYPE:
for key in QSFP_W_dict:
if key == wattage:
val = QSFP_W_dict[key]
if val == []:
show_help()
output_temp = ""
for i,j in zip(QSFP_wattage_reg,val):
cmd = "i2cset -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i + " " + j
print(cmd)
output = Run(cmd)
output_temp += output
if output_temp == "":
print("Set QSFP module wattage %0.1fW success" %wattage)
else:
print("err=%s" %output_temp)
elif sfp_type == QSFP_DD_TYPE:
dd_val = None
for key in QSFPDD_W_dict:
if key == wattage:
dd_val = QSFPDD_W_dict[key]
if dd_val == None:
show_help()
dd_val = dd_val << 5
val = hex(dd_val)
for i in QSFPDD_wattage_reg:
cmd = "i2cset -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i + " " + val
print(cmd)
output = Run(cmd)
if output == "":
print("Set QSFP-DD module wattage %0.1fW success" %wattage)
else:
print("err=%s" %output)
elif sfp_type == SFP_TYPE:
dd_val = None
for key in SFP_W_dict:
if key == wattage:
dd_val = SFP_W_dict[key]
if dd_val == None:
show_help()
val = hex(dd_val)
for i in SFP_wattage_reg:
cmd = "i2cset -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i + " " + val
print(cmd)
output = Run(cmd)
if output == "":
print("Set SFP module wattage %0.1fW success" %wattage)
else:
print("err=%s" %output)
def show_help():
print("\033[1mNAME\033[0m \n"
" get and set wattage of the module\n" )
print("\033[1mSYNOPSIS\033[0m \n"
" python3 sfp.py [ get \033[4mport-num\033[0m ] [ set \033[4mport-num\033[0m \033[4mvalue\033[0m ]\n" )
print("\033[1mCOMMAND LINE OPTIONS\033[0m \n"
" \033[1mh, help\033[0m \n Display this help text\n"
" \033[1mget\033[0m \n Read wattage\n"
" \033[1mset\033[0m \n Write wattage\n" )
print("\033[1mWattage value\033[0m \n"
" \033[1mQSFP\033[0m \n 1.5W,2.0W,2.5W,3.0W,3.5W,4.0W,4.5W,5.0W,5.5W,6.0W\n"
" \033[1mQSFP-DD\033[0m \n 1.5W,3.5W,5.0W,7.0W,10.0W,12.0W,14.0W,16.0W\n"
" \033[1mSFP\033[0m \n 0W,0.5W,1.0W,1.5W,2.0W,2.5W,3.0W,3.5W,4.0W,4.5W,5.0W,5.5W,6.0W,6.5W,7.0W\n" )
print("\033[1mAUTHOR\033[0m \n"
" Written by yt\n")
sys.exit(1)
def get_argv():
global operate
port_str = ParseJson()["current_port"]
conf = ParseJson()[port_str]
port_beg = int(conf["port_begin"])
port_end = int(conf["port_sum"]) + port_beg - 1
argc = len(sys.argv)
if argc < 3 and sys.argv[1] == 'get':
print("Parameter input errors")
print("example: python3 sfp.py get 1")
sys.exit(1)
elif argc < 4 and sys.argv[1] == 'set':
print("Parameter input errors")
print("example: python3 sfp.py set 1 0x01")
sys.exit(1)
elif sys.argv[1] == 'help' or sys.argv[1] == 'h':
show_help()
port = int(sys.argv[2])
if port > port_end or port < port_beg:
print("Port num out of the range: %d-%d" %(port_beg,port_end))
sys.exit(1)
if sys.argv[1] == 'get':
operate = 'read'
elif sys.argv[1] == 'set':
operate = 'write'
print("Current port is %s" %port_str)
def port_modsel_set(val):
path_list = []
Switch_Cpld = ParseJson()["Switch_Cpld"]
sw1_path = Switch_Cpld["sw1_path"]
sw2_path = Switch_Cpld["sw2_path"]
port_modsel_1 = Switch_Cpld["port_modsel_1"]
port_modsel_2 = Switch_Cpld["port_modsel_2"]
path_list.append(sw1_path)
path_list.append(sw2_path)
for path in path_list:
cmd = "echo {} {} > {}/setreg".format(port_modsel_1,val,path)
# print(cmd)
Run(cmd)
cmd = "echo {} {} > {}/setreg".format(port_modsel_2,val,path)
# print(cmd)
Run(cmd)
def port_lpmode_set(val):
path_list = []
Switch_Cpld = ParseJson()["Switch_Cpld"]
sw1_path = Switch_Cpld["sw1_path"]
sw2_path = Switch_Cpld["sw2_path"]
port_lpmode_1 = Switch_Cpld["port_lpmode_1"]
port_lpmode_2 = Switch_Cpld["port_lpmode_2"]
path_list.append(sw1_path)
path_list.append(sw2_path)
for path in path_list:
cmd = "echo {} {} > {}/setreg".format(port_lpmode_1,val,path)
print(cmd)
Run(cmd)
cmd = "echo {} {} > {}/setreg".format(port_lpmode_2,val,path)
print(cmd)
Run(cmd)
if __name__ == '__main__':
get_argv()
port_modsel_set("0x00")
_read_eeprom_specific_bytes()
wattage_operation()
# cfg_sfp.json
{
"current_port" : "5516_port",
"sfp_type" : ["qsfp_dd","sfp"],
"5516_port":{
"port_begin" : 1,
"port_sum" : 56,
"i2c_bus_offset" : 36
},
"5517_port":{
"port_begin" : 1,
"port_sum" : 32,
"i2c_bus_offset" : 36
},
"sfp":{
"wattage_reg" : ["0xC8"],
"i2c_addr" : "0x50",
"temper_reg" : ["0x0E","0x0F"]
},
"qsfp":{
"wattage_reg" : ["0x5D","0x5E"],
"i2c_addr" : "0x50",
"temper_reg" : ["0x16","0x17"]
},
"qsfp_dd":{
"wattage_reg" : ["0xC8"],
"i2c_addr" : "0x50",
"temper_reg" : ["0x0E","0x0F"]
},
"Switch_Cpld": {
"sw1_path" : "/sys/bus/platform/devices/hq_switch/CPLD1",
"sw2_path" : "/sys/bus/platform/devices/hq_switch/CPLD2",
"port_lpmode_1" : "0x11",
"port_lpmode_2" : "0x12",
"port_modsel_1" : "0x13",
"port_modsel_2" : "0x14"
}
}
3. tlv_tool.py:\xf0是二进制数据
如下是二进制文件。
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import sys
import struct
import os
import time
import json
import subprocess
import binascii
from os.path import dirname, abspath
sys.path.append(dirname(dirname(abspath(__file__)))) #当前tlv_tool.py文件所在绝对路径的文件夹的上级文件夹
Project_path = dirname(abspath(__file__)) #获取所在路径文件夹
Conf_path = '{}/tlv_conf.json'.format(Project_path)
def ParseJson(filename=Conf_path):
with open(filename, 'r') as f:
jdata = json.load(f)
return jdata
tlv_json = ParseJson()
tlv_reg_path = tlv_json['tlv_reg_path']
tlv_eeprom_path = tlv_json['tlv_eeprom_path']
def run_command(cmd):
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
if err and proc.returncode != 0:
return err
return out.decode()
# def write_reg(fpath, reg, data):
# if os.getcwd() != fpath:
# if os.path.exists(fpath) == False or os.chdir(fpath) == False:
# return 'file Not exists or file Not accessible'
# if run_command('echo {} {} > setreg'.format(reg, data)) != 0:
# return 'write fail'
# return 0
# def read_reg(fpath, reg):
# if os.getcwd() != fpath:
# if os.path.exists(fpath) == False or os.chdir(fpath) == False:
# return 'file Not exists or file Not accessible'
# if run_command('echo {} > getreg'.format(reg)) != '':
# return 'echo reg fail'
# return run_command('cat getreg').strip()
def crc2hex(crc):
return '0x{:0>8X}'.format(binascii.crc32(crc)) #取crc32的八位数据 %x返回16进制
def unlock_tlv_eeprom(path,reg,data = '0x0'): # unlock_tlv_eeprom(tlv_reg_path,'0x06') #0x06寄存器eeprom写保护
if os.getcwd() != path:
if os.path.exists(path) == False or os.chdir(path) == False:
return 'file Not exists or file Not accessible'
if run_command('echo {} {} > setreg'.format(reg, data)) != 0:
return 'write fail'
if run_command('echo {} > getreg'.format(reg)) != '':
return 'echo reg fail'
output = run_command('cat getreg').strip()
print("set reg {} value: {}".format(reg,output))
if output == data:
print('Unlock tlv eeprom success!') # 0x0
return True
print('Unlock tlv eeprom fail!')
return False
def tlv_read_eeprom(path,length=None,offset=None): # tlv_read_eeprom(tlv_eeprom_path,length)
with open('{}eeprom'.format(path),mode = 'rb') as f:
return f.read(length)
def tlv_read_length():
with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb') as f:
'''
tlv_header_length = char[8] + u8 + u16
'''
tlv_header_length = 11
tlv_signature , tlv_version , tlv_totallen = struct.unpack("!8sBH", f.read(tlv_header_length)) #读11字节
return tlv_header_length + tlv_totallen
def tlv_dump(args): #读出eeprom信息写到tlv_dump.bin文件
length = tlv_read_length()
data = tlv_read_eeprom(tlv_eeprom_path,length)
try:
file_name = args[2]
except IndexError:
file_name = 'tlv_dump.bin'
with open(file_name,mode = 'wb') as f:
# print(data)
# print(len(data))
f.write(data)
f.flush()
print("Dump tlv data finish!")
tlv_type_dirt = {
0x21 : 'Product Name',
0x22 : 'Part Number',
0x23 : 'Serial Number',
0x24 : 'Base MAC Address',
0x25 : 'Manufacture Date',
0x26 : 'Device Version',
0x27 : 'Label Revision',
0x28 : 'Platform Name',
0x29 : 'ONIE Version',
0x2A : 'MAC Address Size',
0x2B : 'Manufacturer',
0x2C : 'Country Code',
0x2D : 'Vendor Name',
0x2E : 'Diag Version',
0x2F : 'Service Tag',
0xFD : 'Vendor Extension',
0xFE : 'CRC-32'
}
def tlv_write(args):
with open(args[2],mode = 'rb') as f:
data = f.read()
if unlock_tlv_eeprom(tlv_reg_path,'0x06'):
with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb+') as f:
f.write(data)
f.flush()
print("Write tlv data finish!")
def tlv_read(args):
if len(args) == 3 and '.bin' in args[2]:
path = args[2]
else:
path = '{}eeprom'.format(tlv_eeprom_path)
with open(path,mode = 'rb') as f:
'''
tlv_header_length = char[8] + u8 + u16
'''
tlv_header_length = 11
tlv_signature , tlv_version , tlv_totallen = struct.unpack("!8sBH", f.read(tlv_header_length))
print("TlvInfo Header:")
print(" Id String : {}".format(tlv_signature.decode().strip()))
print(" Version : {}".format(tlv_version))
print(" Total Length : {}".format(tlv_totallen))
'''
data_header_length = u8 + u8
'''
print("TLV Name Code Len Value")
print("--------------------------------")
data_header_length = 2
length_cnt = 0
stored_crc = None
while length_cnt < tlv_totallen :
data_type , data_length = struct.unpack("!BB", f.read(data_header_length))
length_cnt += data_header_length + data_length
if (data_type in tlv_type_dirt.keys()):
pass
else:
continue
if tlv_type_dirt[data_type] == 'Base MAC Address':
if data_length == 6:
mac0,mac1,mac2,mac3,mac4,mac5 = struct.unpack('!BBBBBB', f.read(data_length))
data = "{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}".format(mac0,mac1,mac2,mac3,mac4,mac5)
else:
print('Base MAC Address Error Length {}!'.format(data_length))
exit
elif tlv_type_dirt[data_type] == 'Device Version':
if data_length == 1:
data = struct.unpack('!B', f.read(data_length))
data = data[0]
else:
print('Device Version Error Length {}!'.format(data_length))
exit
elif tlv_type_dirt[data_type] == 'MAC Address Size':
if data_length == 2:
byte0,byte1 = struct.unpack('!BB', f.read(data_length))
data = (byte0 << 8) | byte1
else:
print('MAC Address Size Error Length {}!'.format(data_length))
exit
elif tlv_type_dirt[data_type] == 'Vendor Extension':
if data_length > 0:
vendor_cmd = '!'
for index in range(data_length):
vendor_cmd += "B"
byte_tuple = struct.unpack(vendor_cmd, f.read(data_length))
data = ''
for byte in byte_tuple:
data += '0x{:0>2X} '.format(byte)
else:
data = 'NULL'
elif tlv_type_dirt[data_type] == 'CRC-32':
if data_length == 4:
crc0,crc1,crc2,crc3 = struct.unpack('!BBBB', f.read(data_length))
data = "0x{:0>2X}{:0>2X}{:0>2X}{:0>2X}".format(crc0,crc1,crc2,crc3)
stored_crc = eval(data)
else:
print('CRC Error Length {}!'.format(data_length))
exit
else:
if data_length > 0:
data = struct.unpack('!{}s'.format(data_length), f.read(data_length))
data = data[0].decode().strip()
else:
# data = None
# data = ''
data = 'NULL'
print("{:<16} 0x{:x} {:>3} : {}".format(tlv_type_dirt[data_type],data_type,data_length,data))
if stored_crc != None:
f.seek(0,0)
crc_data = f.read(tlv_header_length + tlv_totallen - 4)
# print(crc_data)
calc_crc = eval(crc2hex(crc_data))
if calc_crc == stored_crc:
print('Checksum is valid')
else:
print('Checksum is invalid')
else:
print('No Checksum Data')
def tlv_update(args):
try:
file_name = args[2]
except IndexError:
file_name = None
tlv_data = tlv_json['tlv_data']
'''
tlv_header_length = char[8] + u8 + u16
'''
tlv_signature = tlv_json['tlv_signature'].encode()
if len(tlv_signature) > 8:
print('tlv signature length out off range !!!')
exit
while len(tlv_signature) < 8:
tlv_signature += b'\x00'
tlv_version = tlv_json['tlv_version'].to_bytes(1,'big')
tlv_totallen = 0x00
data_list = [] #一般都在for上面定义一个空列表
for item in tlv_type_dirt:
title = tlv_type_dirt[item].replace(' ','_')
if (title in tlv_data.keys()):
pass
else:
if tlv_type_dirt[item] == 'Manufacture Date':
pass
else:
continue
if tlv_type_dirt[item] == 'Base MAC Address':
mac_list = tlv_data[title].split(':')
mac_code = b''
for mac_item in mac_list:
mac_code += eval('0x{}'.format(mac_item)).to_bytes(1,'big')
data = mac_code
elif tlv_type_dirt[item] == 'Manufacture Date':
data = time.strftime("%d-%m-%Y %H:%M:%S", time.localtime()).encode()
elif tlv_type_dirt[item] == 'Device Version':
data = tlv_data[title].to_bytes(1,'big')
elif tlv_type_dirt[item] == 'MAC Address Size':
data = tlv_data[title].to_bytes(2,'big')
elif tlv_type_dirt[item] == 'Vendor Extension':
vendor_list = tlv_data[title].split()
vendor_code = b''
for vendor_item in vendor_list:
vendor_code += eval(vendor_item).to_bytes(1,'big')
data = vendor_code
else:
data = tlv_data[title]
if data != 'None' and data != '':
data = data.encode()
else:
data = b''
type_code = item.to_bytes(1,'big')
len_code = len(data).to_bytes(1,'big')
data_str = type_code + len_code + data
data_list.append(data_str) ##################
for item in data_list:
tlv_totallen += len(item)
'''
tlv_totallen add crc data str length
'''
tlv_totallen = (tlv_totallen + 6).to_bytes(2,'big')
tlv_final_data = tlv_signature + tlv_version + tlv_totallen
for data_item in data_list:
tlv_final_data += data_item
'''
add crc data header and length
'''
tlv_final_data += (0xFE).to_bytes(1,'big') + (0x04).to_bytes(1,'big')
'''
add crc data
'''
crc_data = eval(crc2hex(tlv_final_data)).to_bytes(4,'big')
tlv_final_data += crc_data
# print(tlv_final_data)
if file_name == None:
if unlock_tlv_eeprom(tlv_reg_path,'0x84'):
with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb+') as f:
f.write(tlv_final_data)
f.flush()
print("update tlv data finish!")
else:
with open(file_name,mode = 'wb') as f:
f.write(tlv_final_data)
f.flush()
print("update tlv data bin finish!")
def read_help(args):
print('usage: {} [OPTIONS]'.format(args[0]))
print('Options are:')
print(' -w --write write tlv bin to eeprom')
print(' -d --dump dump tlv bin from eeprom')
print(' -r --read read tlv data from eeprom')
print(' -u --update update tlv data to eeprom or create new tlv bin')
print(' -h --help Display this help text and exit')
print('Example:')
print(' --write <bin name>')
print(' --dump <bin name>')
print(' --update <bin name>')
print(' just create tlv bin , but not update tlv data')
print(' --update')
print(' just update tlv data , but not create tlv bin')
print(' --read')
op = {
'--write' : tlv_write,
'--dump' : tlv_dump,
'--read' : tlv_read,
'--update' : tlv_update,
'--help' : read_help,
'-w' : tlv_write,
'-d' : tlv_dump,
'-r' : tlv_read,
'-u' : tlv_update,
'-h' : read_help,
}
if __name__ == '__main__':
if len(sys.argv) == 1:
read_help(sys.argv)
else:
op.get(sys.argv[1], read_help)(sys.argv)
// tlv_conf.json
{
"tlv_reg_path" : "/sys/bus/platform/devices/hq_switch/cpld_lc/",
"tlv_eeprom_path" : "/sys/bus/i2c/devices/0-0056/",
"tlv_signature": "TlvInfo",
"tlv_version": 1,
"tlv_data": {
"Product_Name": "ZSR5517",
"Part_Number": "ZSR5517",
"Serial_Number": "SN number",
"Base_MAC_Address": "00:E0:EC:00:00:00",
"Device_Version": 1,
"Label_Revision": "ZSR5517",
"Platform_Name": "x86_64-ZSR5517-r0",
"ONIE_Version": "0.0.1",
"MAC_Address_Size": 260,
"Manufacturer": "None",
"Country_Code": "CHN",
"Vendor_Name": "None",
"Diag_Version": "1.4.0",
"Service_Tag": "LB",
"Vendor_Extension": "0x00 0x00 0x00"
}
}