0
点赞
收藏
分享

微信扫一扫

【Note15】u-boot/kernel/busybox编译部署,读写光模块瓦数和温度,tlv读取

乌龙茶3297 2022-04-13 阅读 22

文章目录


1.编译最新的u-boot 2019.10,linux-kernel 5.3.6并用busybox打包根文件系统:成功在全志H5芯片上启动起来

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1.1 u-boot:…_defconfig文件生成.config文件再make编译

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
如下是gcc用来编译uboot。
在这里插入图片描述
在这里插入图片描述
nano .bashrc
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
执行如上后,把SD卡从读卡器拔下插到派上。
在这里插入图片描述

1.2 kernel:先make config再make生成文件拷进去

在这里插入图片描述
如下.dtb就是设备树文件,是编译出来的,.dts是原文件。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1.3 busybox:先make config再make,make install,创建文件

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
如上创建了文件夹,把所有东西打包进来。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
如上拔出SD卡,插入到派上。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.读写光模块瓦数和温度:SFP类型的光模块控制:支持对TX Fault,MOD_ABS,RX_LOS,TX Disable信号的实时电平读取;支持对TX Disable信号的电平控制

在这里插入图片描述

# !/usr/bin/env python
# coding:UTF-8
# sfp.py 
import sys
import datetime
import time
import math
import json
import re
import os
from os.path import dirname, abspath
from subprocess import PIPE, Popen

SFP_TYPE_CODE_LIST = [
    '03',  # SFP/SFP+/SFP28
	'1b'
]
QSFP_TYPE_CODE_LIST = [
    '0d',  # QSFP+ or later
    '11'  # QSFP28 or later
	
]
QSFP_DD_TYPE_CODE_LIST = [
    '18',   # QSFP-DD Double Density 8X Pluggable Transceiver
	'1e'
]

SFP_TYPE = "SFP"
QSFP_TYPE = "QSFP"
QSFP_DD_TYPE = "QSFP_DD"

sfp_type = None
operate = None

#Project_path = dirname(dirname(dirname(abspath(__file__))))
#Conf_path = "%s/cfg_sfp.json"%Project_path
Conf_path = "./cfg_sfp.json"

# "   QSFP                 1.5W,2.0W,2.5W,3.0W,3.5W,4.0W,4.5W,5.0W,5.5W,6.0W\n"
# "   QSFP-DD              1.5W,3.5W,5.0W,7.0W,10.0W,12.0W,14.0W,16.0W\n"		
QSFP_W_dict = {
	1.5 : ["0x00","0x00"],
	2.0 : ["0x01","0x00"],
	2.5 : ["0x02","0x00"],
	3.0 : ["0x04","0x00"],
	3.5 : ["0x08","0x00"],
	4.0 : ["0x10","0x00"],
	4.5 : ["0x20","0x00"],
	5.0 : ["0x40","0x00"],
	5.5 : ["0x80","0x00"],
	6.0 : ["0x80","0x08"],
}

QSFPDD_W_dict = {
	1.5  : 0,
	3.5  : 1,
	5.0  : 2,
	7.0  : 3,
	10.0 : 4,
	12.0 : 5,
	14.0 : 6,
	16.0 : 7,
}

SFP_W_dict = {
	0.0  : 0,
	0.5  : 1,
	1.0  : 2,
	1.5  : 3,
	2.0  : 4,
	2.5  : 5,
	3.0  : 6,
	3.5  : 7,
	4.0  : 8,
	4.5  : 9,
	5.0  : 10,
	5.5  : 11,
	6.0  : 12,
	6.5  : 13,
	7.0  : 14
}

def ParseJson(filename=Conf_path):
	with open(filename, 'r') as f:
		jdata = json.load(f)
	return jdata

def Run(cmd, retype="r"):
	'''Run System Command and Return Command Stdout Object'''
	try:
		with Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE, encoding="utf-8") as f:
			Ret_Type = {"r": f.stdout.read, "rl": f.stdout.readline, "rls" : f.stdout.readlines, "rc": f.wait}
			if retype == 're':
				return f.stdout.read() + f.stderr.read()
			return Ret_Type[retype]()
	except Exception as e:
		print("\033[31mExecute Err:%s\033[0m"%e)

def _read_eeprom_specific_bytes():  #读eeprom判断是什么类型SFP
	global sfp_type
	sysfsfile_eeprom = None
	eeprom_raw = []
	eeprom_list1 = []

	port_str = ParseJson()["current_port"]
	port_conf = ParseJson()[port_str]
	port = int(sys.argv[2])
	i2c_bus_offset = int(port_conf["i2c_bus_offset"])
	i2c_bus = port + i2c_bus_offset

	for p in ParseJson()["sfp_type"]:
		i2c_addr = ParseJson()[p]["i2c_addr"]
		device_node = str(i2c_bus) + "-" + i2c_addr[2:].zfill(4)
		eeprom_path = "/sys/bus/i2c/devices/" + device_node + "/eeprom"
		eeprom_list1.append(eeprom_path)
	eeprom_list = list(set(eeprom_list1))
	for i in range(0, 50):
		eeprom_raw.append("0x00")
	print(eeprom_list)
#	eeprom_path = "/sys/bus/i2c/devices/67-0050/eeprom"

	for eeprom_path in eeprom_list:
		try:
			sysfsfile_eeprom = open(eeprom_path, mode="rb", buffering=0)
			sysfsfile_eeprom.seek(0)
			raw = sysfsfile_eeprom.read(5)
			for n in range(0, 5):
				#eeprom_raw[n] = hex(ord(raw[n]))[2:].zfill(2)
				eeprom_raw[n] = raw[n]
	#			print("data = %x"%eeprom_raw[n])
			a = hex(raw[0])[2:]
			print("first data = %s" %(a))
			if a in QSFP_TYPE_CODE_LIST:
				sfp_type = QSFP_TYPE
			elif a in SFP_TYPE_CODE_LIST:
				sfp_type = SFP_TYPE
			elif a in QSFP_DD_TYPE_CODE_LIST:
				sfp_type = QSFP_DD_TYPE
		except Exception as e:
			print(e)
		finally:
			if sysfsfile_eeprom:
				sysfsfile_eeprom.close()

	return eeprom_raw

def wattage_operation():
	global sfp_type
	global operate

	conf = ParseJson()
	port = int(sys.argv[2])   # python3 sfp.py get 1(哪个端口)
	port_str = ParseJson()["current_port"]
	port_conf = ParseJson()[port_str]	

	i2c_bus_offset = int(port_conf["i2c_bus_offset"])
	QSFP_wattage_reg = conf["qsfp"]["wattage_reg"]
	SFP_wattage_reg = conf["sfp"]["wattage_reg"]
	QSFPDD_wattage_reg = conf["qsfp_dd"]["wattage_reg"]


	if sfp_type == QSFP_TYPE:
		i2c_addr = conf["qsfp"]["i2c_addr"]
		print("This SFP type: %s " %QSFP_TYPE)
	elif sfp_type == SFP_TYPE:	
		i2c_addr = conf["sfp"]["i2c_addr"]	
		print("This SFP type: %s " %SFP_TYPE)
	elif sfp_type == QSFP_DD_TYPE:	
		i2c_addr = conf["qsfp_dd"]["i2c_addr"]	
		print("This SFP type: %s " %QSFP_DD_TYPE)	
	else:
		print("This SFP type is not found")

	i2c_bus = port + i2c_bus_offset

	if operate == 'read':
		if sfp_type == QSFP_TYPE:
			reg_data = []
			for i in QSFP_wattage_reg:
				cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i
				print(cmd)	
				output = Run(cmd)	
				print("output=%s" %output)
				reg_data.append(output.replace('\n',''))

			for k,val in QSFP_W_dict.items():
				if val == reg_data:
					print("This QSFP module wattage : %0.1fW" %k)		
			
			temper = conf["qsfp"]["temper_reg"]
			cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[0]
			output1 = Run(cmd)
			#print(output1)
			cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[1]
			output2 = Run(cmd)
			#print(output2)
			temp = (int(output1.replace('\n',''),16) << 8) + int(output2.replace('\n',''),16)
			#print("temp=%d" %temp)
			print("Temperature of loopback module : %0.1fC" %(temp/256))
		elif sfp_type == QSFP_DD_TYPE:
			for i in QSFPDD_wattage_reg:
				cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i
				print(cmd)	
				output = Run(cmd)	
				print("output=%s" %output)	
				reg_dd = (int(output.replace('\n',''),16) >> 5)
			for k,val in QSFPDD_W_dict.items():
				if val == reg_dd:
					print("This QSFP-DD module wattage : %0.1fW" %k)	

			temper = conf["qsfp_dd"]["temper_reg"]
			cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[0]
			output1 = Run(cmd)
			#print(output1)
			cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[1]
			output2 = Run(cmd)
			#print(output2)
			temp = (int(output1.replace('\n',''),16) << 8) + int(output2.replace('\n',''),16)
			#print("temp=%d" %temp)
			print("Temperature of loopback module : %0.1fC" %(temp/256))
		elif sfp_type == SFP_TYPE:
			for i in SFP_wattage_reg:
				cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i
				print(cmd)	
				output = Run(cmd)	
				print("output=%s" %output)	
				reg_sfp = int(output.replace('\n',''),16)
			for k,val in SFP_W_dict.items():
				if val == reg_sfp:
					print("This SFP module wattage : %0.1fW" %k)	

			temper = conf["sfp"]["temper_reg"]
			cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[0]
			output1 = Run(cmd)
			#print(output1)
			cmd = "i2cget -y -f " + str(i2c_bus) + " " + i2c_addr + " " + temper[1]
			output2 = Run(cmd)
			#print(output2)
			temp = (int(output1.replace('\n',''),16) << 8) + int(output2.replace('\n',''),16)
			#print("temp=%d" %temp)
			print("Temperature of loopback module : %0.1fC" %(temp/256))

	elif operate == 'write':
		wattage_str = sys.argv[3]
		if wattage_str[-1] == 'w' or wattage_str[-1] == 'W':
			wattage_str = wattage_str[:-1]
		pattern = re.compile(r'^[-+]?[-0-9]\d*\.\d*|[-+]?\.?[0-9]\d*$')
		result = pattern.match(wattage_str)	
		wattage = float(wattage_str)
		if result:
			pass
		else:
			show_help()
		
		val = []
		if sfp_type == QSFP_TYPE:
			for key in  QSFP_W_dict:
				if key ==  wattage:
					val = QSFP_W_dict[key]
			if val == []:
				show_help()	
			output_temp = ""	
			for i,j in zip(QSFP_wattage_reg,val):
				cmd = "i2cset -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i + " " + j
				print(cmd)	
				output = Run(cmd)	
				output_temp += output
			if output_temp == "":
				print("Set QSFP module wattage %0.1fW success" %wattage)
			else:					
				print("err=%s" %output_temp)	
		elif sfp_type == QSFP_DD_TYPE:
			dd_val = None
			for key in  QSFPDD_W_dict:
				if key ==  wattage:
					dd_val = QSFPDD_W_dict[key]
			if dd_val == None:
				show_help()
			dd_val = dd_val << 5
			val = hex(dd_val)
			for i in QSFPDD_wattage_reg:
				cmd = "i2cset -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i + " " + val
				print(cmd)	
				output = Run(cmd)	
				if output == "":
					print("Set QSFP-DD module wattage %0.1fW success" %wattage)
				else:					
					print("err=%s" %output)	
		elif sfp_type == SFP_TYPE:
			dd_val = None
			for key in  SFP_W_dict:
				if key ==  wattage:
					dd_val = SFP_W_dict[key]
			if dd_val == None:
				show_help()
			val = hex(dd_val)
			for i in SFP_wattage_reg:
				cmd = "i2cset -y -f " + str(i2c_bus) + " " + i2c_addr + " " + i + " " + val
				print(cmd)	
				output = Run(cmd)	
				if output == "":
					print("Set SFP module wattage %0.1fW success" %wattage)
				else:					
					print("err=%s" %output)	

def show_help():
	print("\033[1mNAME\033[0m \n"
		"   get and set wattage of the module\n"	)
	print("\033[1mSYNOPSIS\033[0m \n"
		"   python3 sfp.py [ get \033[4mport-num\033[0m ] [ set \033[4mport-num\033[0m \033[4mvalue\033[0m ]\n"	)
	print("\033[1mCOMMAND LINE OPTIONS\033[0m \n"
		"  \033[1mh, help\033[0m \n      Display this help text\n"
		"  \033[1mget\033[0m \n      Read wattage\n"	
		"  \033[1mset\033[0m \n      Write wattage\n"	)
	print("\033[1mWattage value\033[0m \n"
		"  \033[1mQSFP\033[0m \n      1.5W,2.0W,2.5W,3.0W,3.5W,4.0W,4.5W,5.0W,5.5W,6.0W\n"
		"  \033[1mQSFP-DD\033[0m \n      1.5W,3.5W,5.0W,7.0W,10.0W,12.0W,14.0W,16.0W\n"
		"  \033[1mSFP\033[0m \n      0W,0.5W,1.0W,1.5W,2.0W,2.5W,3.0W,3.5W,4.0W,4.5W,5.0W,5.5W,6.0W,6.5W,7.0W\n"	)
	print("\033[1mAUTHOR\033[0m \n"
		"      Written by yt\n")
	sys.exit(1)

def get_argv():
	global operate
	port_str = ParseJson()["current_port"]	
	conf = ParseJson()[port_str]	
	port_beg = int(conf["port_begin"])
	port_end = int(conf["port_sum"]) + port_beg - 1
	argc = len(sys.argv)
	if argc < 3 and sys.argv[1] == 'get':
		print("Parameter input errors")
		print("example: python3 sfp.py get 1")
		sys.exit(1)
	elif argc < 4 and sys.argv[1] == 'set':
		print("Parameter input errors")
		print("example: python3 sfp.py set 1 0x01")	
		sys.exit(1)
	elif sys.argv[1] == 'help' or sys.argv[1] == 'h':
		show_help()
			
	port = int(sys.argv[2])
	if port > port_end or port < port_beg:
		print("Port num out of the range: %d-%d" %(port_beg,port_end))
		sys.exit(1)
	
	if sys.argv[1] == 'get':
		operate = 'read'
	elif sys.argv[1] == 'set':
		operate = 'write'
	print("Current port is %s" %port_str)

def port_modsel_set(val):
	path_list = []
	Switch_Cpld = ParseJson()["Switch_Cpld"]
	sw1_path = Switch_Cpld["sw1_path"]
	sw2_path = Switch_Cpld["sw2_path"]
	port_modsel_1 = Switch_Cpld["port_modsel_1"]
	port_modsel_2 = Switch_Cpld["port_modsel_2"]
	path_list.append(sw1_path)
	path_list.append(sw2_path)
	for path in path_list:
		cmd = "echo {} {} > {}/setreg".format(port_modsel_1,val,path)
		# print(cmd)	
		Run(cmd)
		cmd = "echo {} {} > {}/setreg".format(port_modsel_2,val,path)
		# print(cmd)	
		Run(cmd)

def port_lpmode_set(val):
	path_list = []
	Switch_Cpld = ParseJson()["Switch_Cpld"]
	sw1_path = Switch_Cpld["sw1_path"]
	sw2_path = Switch_Cpld["sw2_path"]
	port_lpmode_1 = Switch_Cpld["port_lpmode_1"]
	port_lpmode_2 = Switch_Cpld["port_lpmode_2"]
	path_list.append(sw1_path)
	path_list.append(sw2_path)
	for path in path_list:
		cmd = "echo {} {} > {}/setreg".format(port_lpmode_1,val,path)
		print(cmd)	
		Run(cmd)
		cmd = "echo {} {} > {}/setreg".format(port_lpmode_2,val,path)
		print(cmd)	
		Run(cmd)

if __name__ == '__main__':
	get_argv()
	port_modsel_set("0x00")
	_read_eeprom_specific_bytes()
	wattage_operation()
# cfg_sfp.json
{
    "current_port"          : "5516_port",
    "sfp_type"              : ["qsfp_dd","sfp"],
    "5516_port":{
        "port_begin"        : 1,
        "port_sum"          : 56,
        "i2c_bus_offset"    : 36
    },
    "5517_port":{
        "port_begin"        : 1,
        "port_sum"          : 32,
        "i2c_bus_offset"    : 36
    },
    "sfp":{
        "wattage_reg"       : ["0xC8"],
        "i2c_addr"          : "0x50",
        "temper_reg"        : ["0x0E","0x0F"]
    },
    "qsfp":{
        "wattage_reg"       : ["0x5D","0x5E"],
        "i2c_addr"          : "0x50",
        "temper_reg"        : ["0x16","0x17"]
    },
    "qsfp_dd":{
        "wattage_reg"       : ["0xC8"],
        "i2c_addr"          : "0x50",
        "temper_reg"        : ["0x0E","0x0F"]
    },
    "Switch_Cpld": {
        "sw1_path"          : "/sys/bus/platform/devices/hq_switch/CPLD1",
        "sw2_path"          : "/sys/bus/platform/devices/hq_switch/CPLD2",
        "port_lpmode_1"     : "0x11",
        "port_lpmode_2"     : "0x12",
        "port_modsel_1"     : "0x13",
        "port_modsel_2"     : "0x14"
    }
}

在这里插入图片描述

3. tlv_tool.py:\xf0是二进制数据

在这里插入图片描述
在这里插入图片描述
如下是二进制文件。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import sys
import struct
import os
import time
import json
import subprocess
import binascii
from os.path import dirname, abspath

sys.path.append(dirname(dirname(abspath(__file__))))  #当前tlv_tool.py文件所在绝对路径的文件夹的上级文件夹
Project_path = dirname(abspath(__file__))  #获取所在路径文件夹
Conf_path = '{}/tlv_conf.json'.format(Project_path)

def ParseJson(filename=Conf_path):
    with open(filename, 'r') as f:
        jdata = json.load(f)
    return jdata
tlv_json = ParseJson()
tlv_reg_path = tlv_json['tlv_reg_path']
tlv_eeprom_path = tlv_json['tlv_eeprom_path']

def run_command(cmd):
    proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = proc.communicate()
    if err and proc.returncode != 0:
        return err
    return out.decode()

# def write_reg(fpath, reg, data):
#     if os.getcwd() != fpath:
#         if os.path.exists(fpath) == False or os.chdir(fpath) == False:
#             return 'file Not exists or file Not accessible'
#     if run_command('echo {} {} > setreg'.format(reg, data)) != 0:
#         return 'write fail'
#     return 0

# def read_reg(fpath, reg):
#     if os.getcwd() != fpath:
#         if os.path.exists(fpath) == False or os.chdir(fpath) == False:
#             return 'file Not exists or file Not accessible'
#     if run_command('echo {} > getreg'.format(reg)) != '':
#         return 'echo reg fail'
#     return run_command('cat getreg').strip()

def crc2hex(crc):
    return '0x{:0>8X}'.format(binascii.crc32(crc))    #取crc32的八位数据 %x返回16进制

def unlock_tlv_eeprom(path,reg,data = '0x0'):   # unlock_tlv_eeprom(tlv_reg_path,'0x06')  #0x06寄存器eeprom写保护
    if os.getcwd() != path:
        if os.path.exists(path) == False or os.chdir(path) == False:
            return 'file Not exists or file Not accessible'
    if run_command('echo {} {} > setreg'.format(reg, data)) != 0:
        return 'write fail'
    if run_command('echo {} > getreg'.format(reg)) != '':
        return 'echo reg fail'
    output =  run_command('cat getreg').strip()
    print("set reg {} value: {}".format(reg,output))
    if output == data:
        print('Unlock tlv eeprom success!')  # 0x0
        return True
    print('Unlock tlv eeprom fail!')
    return False

def tlv_read_eeprom(path,length=None,offset=None):  # tlv_read_eeprom(tlv_eeprom_path,length)
    with open('{}eeprom'.format(path),mode = 'rb') as f:
        return f.read(length)

def tlv_read_length():
    with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb') as f:
        '''
            tlv_header_length = char[8] + u8 + u16
        '''
        tlv_header_length = 11
        tlv_signature , tlv_version , tlv_totallen = struct.unpack("!8sBH", f.read(tlv_header_length))  #读11字节
    return tlv_header_length + tlv_totallen

def tlv_dump(args):  #读出eeprom信息写到tlv_dump.bin文件
    length = tlv_read_length()
    data = tlv_read_eeprom(tlv_eeprom_path,length)
    try:
        file_name = args[2]
    except IndexError:
        file_name = 'tlv_dump.bin'
    with open(file_name,mode = 'wb') as f:
        # print(data)
        # print(len(data))
        f.write(data) 
        f.flush()    
    print("Dump tlv data finish!")

tlv_type_dirt = {
    0x21 : 'Product Name', 
    0x22 : 'Part Number',   
    0x23 : 'Serial Number', 
    0x24 : 'Base MAC Address', 
    0x25 : 'Manufacture Date',
    0x26 : 'Device Version', 
    0x27 : 'Label Revision', 
    0x28 : 'Platform Name', 
    0x29 : 'ONIE Version', 
    0x2A : 'MAC Address Size', 
    0x2B : 'Manufacturer', 
    0x2C : 'Country Code',
    0x2D : 'Vendor Name',
    0x2E : 'Diag Version',
    0x2F : 'Service Tag',
    0xFD : 'Vendor Extension',
    0xFE : 'CRC-32'
}

def tlv_write(args):
    with open(args[2],mode = 'rb') as f:
       data = f.read()
    if unlock_tlv_eeprom(tlv_reg_path,'0x06'):
        with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb+') as f:
            f.write(data)
            f.flush()
            print("Write tlv data finish!")

def tlv_read(args):
    if len(args) == 3 and '.bin' in args[2]:
        path = args[2]
    else:
        path = '{}eeprom'.format(tlv_eeprom_path)
    with open(path,mode = 'rb') as f:
        '''
            tlv_header_length = char[8] + u8 + u16
        '''
        tlv_header_length = 11
        tlv_signature , tlv_version , tlv_totallen = struct.unpack("!8sBH", f.read(tlv_header_length))
        print("TlvInfo Header:")
        print("    Id String    : {}".format(tlv_signature.decode().strip()))
        print("    Version      : {}".format(tlv_version))
        print("    Total Length : {}".format(tlv_totallen))
        '''
            data_header_length = u8 + u8
        '''
        print("TLV Name         Code Len  Value")
        print("--------------------------------")
        data_header_length = 2
        length_cnt = 0
        stored_crc = None
        while length_cnt < tlv_totallen :
            data_type , data_length = struct.unpack("!BB", f.read(data_header_length))
            length_cnt += data_header_length + data_length
            if (data_type in tlv_type_dirt.keys()):
                pass
            else:
                continue

            if tlv_type_dirt[data_type] == 'Base MAC Address':
                if data_length == 6:
                    mac0,mac1,mac2,mac3,mac4,mac5 = struct.unpack('!BBBBBB', f.read(data_length))
                    data = "{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}".format(mac0,mac1,mac2,mac3,mac4,mac5)
                else:
                    print('Base MAC Address Error Length {}!'.format(data_length))
                    exit

            elif tlv_type_dirt[data_type] == 'Device Version':
                if data_length == 1:
                    data = struct.unpack('!B', f.read(data_length))
                    data = data[0]
                else:
                    print('Device Version Error Length {}!'.format(data_length))
                    exit

            elif tlv_type_dirt[data_type] == 'MAC Address Size':
                if data_length == 2:
                    byte0,byte1 = struct.unpack('!BB', f.read(data_length))
                    data = (byte0 << 8) | byte1
                else:
                    print('MAC Address Size Error Length {}!'.format(data_length))
                    exit

            elif tlv_type_dirt[data_type] == 'Vendor Extension':
                if data_length > 0:
                    vendor_cmd = '!'
                    for index in range(data_length):
                       vendor_cmd += "B"
                    byte_tuple = struct.unpack(vendor_cmd, f.read(data_length))
                    data = ''
                    for byte in byte_tuple:
                        data += '0x{:0>2X} '.format(byte)
                else:
                    data = 'NULL'
                  
            elif tlv_type_dirt[data_type] == 'CRC-32':
                if data_length == 4:
                    crc0,crc1,crc2,crc3 = struct.unpack('!BBBB', f.read(data_length))
                    data = "0x{:0>2X}{:0>2X}{:0>2X}{:0>2X}".format(crc0,crc1,crc2,crc3)
                    stored_crc = eval(data)
                else:
                    print('CRC Error Length {}!'.format(data_length))
                    exit
            else:
                if data_length > 0:
                    data = struct.unpack('!{}s'.format(data_length), f.read(data_length))
                    data = data[0].decode().strip()
                else:
                    # data = None
                    # data = ''
                    data = 'NULL'

            print("{:<16} 0x{:x} {:>3} : {}".format(tlv_type_dirt[data_type],data_type,data_length,data))
        
        if stored_crc != None:
            f.seek(0,0)
            crc_data = f.read(tlv_header_length + tlv_totallen - 4)
            # print(crc_data)
            calc_crc = eval(crc2hex(crc_data))
            if calc_crc == stored_crc:
                print('Checksum is valid')
            else:
                print('Checksum is invalid')
        else:
            print('No Checksum Data')

def tlv_update(args):
    try:
        file_name = args[2]
    except IndexError:
        file_name = None
    tlv_data = tlv_json['tlv_data']
    '''
        tlv_header_length = char[8] + u8 + u16
    '''
    tlv_signature = tlv_json['tlv_signature'].encode()
    if len(tlv_signature) > 8:
        print('tlv signature length out off range !!!')
        exit
    while len(tlv_signature) < 8:
        tlv_signature += b'\x00'
    tlv_version = tlv_json['tlv_version'].to_bytes(1,'big')
    tlv_totallen = 0x00

    data_list = []      #一般都在for上面定义一个空列表
    for item in tlv_type_dirt:
        title = tlv_type_dirt[item].replace(' ','_')
        if (title in tlv_data.keys()):
            pass
        else:
            if tlv_type_dirt[item] == 'Manufacture Date':
                pass
            else:
                continue
        if tlv_type_dirt[item] == 'Base MAC Address':
            mac_list = tlv_data[title].split(':')
            mac_code = b''
            for mac_item in mac_list:
                mac_code += eval('0x{}'.format(mac_item)).to_bytes(1,'big')
            data = mac_code
        elif tlv_type_dirt[item] == 'Manufacture Date':
            data = time.strftime("%d-%m-%Y %H:%M:%S", time.localtime()).encode()
        elif tlv_type_dirt[item] == 'Device Version':
            data = tlv_data[title].to_bytes(1,'big')
        elif tlv_type_dirt[item] == 'MAC Address Size':
            data = tlv_data[title].to_bytes(2,'big')
        elif tlv_type_dirt[item] == 'Vendor Extension':
            vendor_list = tlv_data[title].split()
            vendor_code = b''
            for vendor_item in vendor_list:
                vendor_code += eval(vendor_item).to_bytes(1,'big')
            data = vendor_code
        else:
            data = tlv_data[title]
            if data != 'None' and data != '':
                data = data.encode()
            else:
                data = b''
        type_code = item.to_bytes(1,'big')
        len_code = len(data).to_bytes(1,'big')
        data_str = type_code + len_code + data
        data_list.append(data_str)  ##################

    for item in data_list:
        tlv_totallen += len(item)
    '''
        tlv_totallen add crc data str length
    '''
    tlv_totallen = (tlv_totallen + 6).to_bytes(2,'big')
    tlv_final_data = tlv_signature + tlv_version + tlv_totallen
    for data_item in data_list:
        tlv_final_data += data_item
    '''
        add crc data header and length
    '''
    tlv_final_data += (0xFE).to_bytes(1,'big') + (0x04).to_bytes(1,'big')
    '''
        add crc data
    '''
    crc_data = eval(crc2hex(tlv_final_data)).to_bytes(4,'big')

    tlv_final_data += crc_data
    # print(tlv_final_data)

    if file_name == None:
        if unlock_tlv_eeprom(tlv_reg_path,'0x84'):
            with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb+') as f:
                f.write(tlv_final_data)
                f.flush()
                print("update tlv data finish!")
    else:
        with open(file_name,mode = 'wb') as f:
            f.write(tlv_final_data) 
            f.flush()
        print("update tlv data bin finish!")

def read_help(args):
    print('usage: {} [OPTIONS]'.format(args[0]))
    print('Options are:')
    print('     -w      --write      write tlv bin to eeprom')
    print('     -d      --dump       dump tlv bin from eeprom')
    print('     -r      --read       read  tlv data from eeprom')
    print('     -u      --update     update tlv data to eeprom or create new tlv bin')
    print('     -h      --help       Display this help text and exit')
    print('Example:')
    print('             --write  <bin name>')
    print('             --dump   <bin name>')
    print('             --update <bin name>')
    print('                  just create tlv bin , but not update tlv data')
    print('             --update')
    print('                  just update tlv data , but not create tlv bin')
    print('             --read')

op = {
    '--write'  : tlv_write,
    '--dump'   : tlv_dump,
    '--read'   : tlv_read,
    '--update' : tlv_update,
    '--help'   : read_help,
    '-w'       : tlv_write,
    '-d'       : tlv_dump,
    '-r'       : tlv_read,
    '-u'       : tlv_update,
    '-h'       : read_help,
}

if __name__ == '__main__':
    if len(sys.argv) == 1:
        read_help(sys.argv)
    else:        
        op.get(sys.argv[1], read_help)(sys.argv)
// tlv_conf.json
{
    "tlv_reg_path" : "/sys/bus/platform/devices/hq_switch/cpld_lc/",
    "tlv_eeprom_path" : "/sys/bus/i2c/devices/0-0056/",
    "tlv_signature": "TlvInfo",
    "tlv_version": 1, 
    "tlv_data": {
        "Product_Name":     "ZSR5517", 
        "Part_Number":      "ZSR5517",   
        "Serial_Number":    "SN number", 
        "Base_MAC_Address": "00:E0:EC:00:00:00", 
        "Device_Version":   1, 
        "Label_Revision":   "ZSR5517", 
        "Platform_Name":    "x86_64-ZSR5517-r0", 
        "ONIE_Version":     "0.0.1", 
        "MAC_Address_Size": 260, 
        "Manufacturer":     "None", 
        "Country_Code":     "CHN",
        "Vendor_Name":      "None",
        "Diag_Version":     "1.4.0",
        "Service_Tag":      "LB",
        "Vendor_Extension": "0x00 0x00 0x00"
    }
}

在这里插入图片描述
在这里插入图片描述

举报

相关推荐

0 条评论