0
点赞
收藏
分享

微信扫一扫

使用python对交换机进行排障自动化运维(锐捷)

import glob

import telnetlib

import re

from datetime import datetime

from time import sleep


import pandas as pd

import os

import time


from matplotlib import pyplot as plt





# Telnet 连接函数

def connect_telnet(hostname, username, password):

   try:

       tn = telnetlib.Telnet()

       tn.open(hostname, port=23, timeout=5)  # 尝试连接 Telnet

       print("connected......", hostname)


       # 输入用户名

       tn.read_until(b'Username:', timeout=5)

       tn.write(username.encode('ascii') + b'\n')


       # 输入密码

       tn.read_until(b'Password:', timeout=5)

       tn.write(password.encode('ascii') + b'\n')


       # 检查是否成功登录

       login_result = tn.read_until(b'#', timeout=5)

       if b'#' not in login_result:

           print('登录失败!', hostname)

           tn.close()  # 登录失败,关闭连接

           return None

       else:

           print('登录成功!', hostname)

           return tn  # 登录成功,返回 Telnet 对象

   except Exception as e:

       print(f"连接失败: {e}")

       return None  # 连接失败,返回 None



# 执行命令的函数:执行 Telnet 下的相关命令并返回结果

def execute_ip_ospf_neighbor_detail(tn, hostname):

   # 执行第一个命令:显示当前系统时间

   command1 = bytes("show clock", encoding='utf-8')

   tn.write(command1 + b'\r\n')  # 发送命令并执行

   command1_result = tn.read_until(b'#')  # 读取直到命令提示符结束的结果

   command1_result = re.findall(r"\d+:\d+:\d+\s+\w+\s+\w+\s+\w+\s+\d+\s+2004", command1_result.decode('GB18030'))[0]


   # 执行第二个命令:显示 OSPF 邻居详情

   command2 = bytes("show ip ospf neighbor detail", encoding='utf-8')

   tn.write(command2 + b'\r\n')  # 发送命令


   result_list = []

   # 持续读取 OSPF 命令结果,直到读取完成

   while True:

       command2_result = tn.read_very_eager().decode('ascii')

       result_list.append(command2_result)

       if re.findall(r"--More--", command2_result.strip()):  # 如果命令输出有 "--More--" 字样,按空格获取下一页

           tn.write(b" ")

       elif re.findall(r"#", command2_result.strip()):  # 如果命令输出结束,退出循环

           break

       else:

           time.sleep(1)

           continue


   result_str = "\n".join(result_list)  # 将结果列表拼接成一个字符串


   # 将命令执行的结果存储为字典

   dict_output = {}

   dict_output["host_ip"] = hostname

   dict_output["time"] = command1_result  # 存储命令 1(系统时间)结果


   # 使用正则表达式提取 OSPF 邻居详细信息

   dict_output["OSPF Router with ID"] = re.search(r'OSPF Router with ID (.+)', result_str).group(1)

   dict_output["Neighbor"] = re.search(r'Neighbor\s+(\d+.\d+.\d+.\d+)', result_str).group(1)

   dict_output["area"] = re.search(r'In the area\s+(.+)', result_str).group(1)

   dict_output["State"] = re.search(r'State\s+(\w+), ', result_str).group(1)


   # 将字典数据转换为 DataFrame 并保存为 CSV 文件

   pd_output = pd.DataFrame.from_dict([dict_output])

   pd_output['time'] = pd.to_datetime(pd_output['time'])

   pd_output['time'] = pd_output['time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))


   ttt = str(datetime.now())

   # pd_output.to_csv('./' + os.sep + r'ospf_neighbor_detail' + '-' + 'hostname' + '.csv', mode='a', index=None, encoding='gb18030')

   pd_output.to_csv('./' + os.sep + r'ospf_neighbor_detail' + '-' + 'hostname' + ttt +'.csv', index=None, encoding='gb18030')


# 示例调用:执行 "show interface brief" 命令

def execute_interface_brief(tn, hostname):

   command = bytes("show interface brief", encoding='utf-8')

   tn.write(command + b'\r\n')


   result_list = []

   # 持续读取命令结果,处理 "--More--" 分页输出

   while True:

       command_result = tn.read_very_eager().decode('ascii')

       result_list.append(command_result)

       if re.findall(r"--More--", command_result.strip()):

           tn.write(b" ")

       elif re.findall(r"#", command_result.strip()):

           break

       else:

           time.sleep(1)

           continue


   result_str = "\n".join(result_list)

   list_str = result_str.split('\n')  # 将结果按行分割成列表


   list_temperature_vec = []

   # 遍历结果行,查找符合正则表达式的内容

   for j in list_str:

       regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)

       if len(re.findall(r"Interface", j)) > 0:

           new_columns = list_find_str = re.split(r'\s+', j)  # 提取列名

           new_columns = new_columns[0:8]


       if len(regex.findall(j)) > 0:

           list_find_str = regex.findall(j)[0]

           list_find_str = re.split(r'\s+', list_find_str)  # 提取数据行

           list_temperature_vec.append(list_find_str)


   pd_result = pd.DataFrame(list_temperature_vec)

   pd_result.columns = new_columns  # 设置 DataFrame 列名

   pd_result.to_csv('./' + os.sep + r'interface_brief' + '-' + str(hostname)+ str(datetime.now()) + '.csv', encoding='gb18030')  # 保存结果

   return pd_result  # 返回结果 DataFrame



# 示例调用:光功率1:执行 "show opticalinfo brief" 命令

def execute_opticalinfo_brief(tn, hostname):

   command = bytes("show opticalinfo brief", encoding='utf-8')

   tn.write(command + b'\r\n')


   result_list = []

   # 持续读取命令结果,处理 "--More--" 分页输出

   while True:

       command_result = tn.read_very_eager().decode('ascii')

       result_list.append(command_result)

       if re.findall(r"--More--", command_result.strip()):

           tn.write(b" ")

       elif re.findall(r"#", command_result.strip()):

           break

       else:

           time.sleep(1)

           continue


   result_str = "\n".join(result_list)

   list_str = result_str.split('\n')  # 将结果按行分割成列表


   list_temperature_vec = []

   # 遍历结果行,查找符合正则表达式的内容

   for j in list_str:

       regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)

       if len(re.findall(r"Interface", j)) > 0:

           new_columns = list_find_str = re.split(r'\s+', j)  # 提取列名

           new_columns = new_columns[0:6]


       if len(regex.findall(j)) > 0:

           list_find_str = regex.findall(j)[0]

           list_find_str = re.split(r'\s+', list_find_str)  # 提取数据行

           list_temperature_vec.append(list_find_str)


   pd_result = pd.DataFrame(list_temperature_vec)

   pd_result.columns = new_columns  # 设置 DataFrame 列名

   pd_result.to_csv('./' + os.sep + r'interface_brief' + '-' + str(hostname) + str(datetime.now()) + '.csv',

                    encoding='gb18030')  # 保存结果

   return pd_result  # 返回结果 DataFrame



# 示例调用:光功率2:执行 "show opticalinfo xgei-0/3/0/2" 命令

# 这个命令还得现场看下,可以参考show interface xgei-0/2/0/13

# def execute_opticalinfo_xgei(tn, hostname):




# 示例调用:执行 "show ip interface brief" 命令,类似上面的函数

def execute_ip_interface_brief(tn, hostname):

   command = bytes("show ip interface brief", encoding='utf-8')

   tn.write(command + b'\r\n')


   result_list = []

   # 持续读取结果

   while True:

       command_result = tn.read_very_eager().decode('ascii')

       result_list.append(command_result)

       if re.findall(r"--More--", command_result.strip()):

           tn.write(b" ")

       elif re.findall(r"#", command_result.strip()):

           break

       else:

           time.sleep(1)

           continue


   result_str = "\n".join(result_list)

   list_str = result_str.split('\n')


   list_temperature_vec = []

   for j in list_str:

       if len(re.findall(r"Interface", j)) > 0:

           new_columns = re.split(r'\s+', j)  # 提取列名

           print("new_columns111111111:",new_columns)

           new_columns = new_columns[0:6]


       regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)

       print(regex)

       if len(regex.findall(j)) > 0:

           list_find_str = regex.findall(j)[0]

           list_find_str = re.split(r'\s+', list_find_str)

           list_temperature_vec.append(list_find_str)


   pd_result = pd.DataFrame(list_temperature_vec)

   pd_result.columns = new_columns  # 设置列名

   pd_result.to_csv('./' + os.sep + r'ip_interface_brief' + '-' + str(hostname) + '.csv', encoding='gb18030')  # 保存结果

   return pd_result



# 示例调用:执行 "show interface xgei-0/2/0/13" 命令,获取接口的详细信息

def execute_show_interface_xgei(tn, hostname):

   command = bytes("show interface xgei-0/2/0/2", encoding='utf-8')

   tn.write(command + b'\r\n')


   result_list = []

   # 持续读取结果

   while True:

       command_result = tn.read_very_eager().decode('ascii')

       result_list.append(command_result)

       if re.findall(r"--More--", command_result.strip()):

           tn.write(b" ")

       elif re.findall(r"#", command_result.strip()):

           break

       else:

           time.sleep(1)

           continue


   result_str = "\n".join(result_list)


   # 将结果存储为字典

   dict_output = {}

   dict_output["time"] = datetime.now()

   dict_output["host_ip"] = hostname

   dict_output['interface'] = re.search(r'\w+gei-\d\/\d\/\d\/\d+', result_str).group(0)

   dict_output['In_Bytes'] = re.search(r'In_Bytes\s+(\d+)', result_str).group(1)

   dict_output['E_Bytes'] = re.search(r'E_Bytes\s+(\d+)', result_str).group(1)


   # 保存结果为 CSV 文件

   pd_output = pd.DataFrame.from_dict([dict_output])

   # pd_output.to_csv('./' + os.sep + r'show_interface_xgei' + '.csv', mode='a', index=None, encoding='gb18030')

   pd_output.to_csv('./' + os.sep + r'show_interface_xgei' + '.csv', index=None, encoding='gb18030')


def excute_ping(tn, hostname):

   command = bytes("ping 127.0.0.1", encoding='utf-8')

   tn.write(command + b'\r\n')

   time.sleep(5)

   result_list = []

   while True:

       command_result = tn.read_very_eager().decode('ascii')

       result_list.append(command_result)

       if re.findall(r"--More--", command_result.strip()):

           tn.write(b" ")

       elif re.findall(r"#", command_result.strip()):

           break

       else:

           time.sleep(1)

           continue

   # time.sleep(20)

   result_str = "\n".join(result_list)

   # print(result_str)


   # 解析结果并存储

   dict_output = {}

   dict_output["host_ip"] = hostname

   dict_output["info"] = result_str

   pd_output = pd.DataFrame.from_dict([dict_output])


   pd_output.to_csv('./' + os.sep + r'ping' + '-' + 'hostname' + str(datetime.now()) + '.csv', mode='a', index=None, encoding='gb18030')



def files_to_one():



   file_list = glob.glob(os.path.join(os.path.abspath('./'), r'ospf*.csv'))

   pd_data_o = pd.DataFrame()

   for file in file_list:

       try:

           pd_sheet = pd.read_csv(file, encoding='gb18030', doublequote=False,

                                  converters={u'code': str}, engine="python")

       except:

           print('读取异常')

       pd_data_o = pd.concat([pd_data_o, pd_sheet], axis=0)

       pd_data_o.to_csv('./' + os.sep + r'ospf' + '.csv', index=None, encoding='gb18030')





if __name__ == '__main__':


   '''

   正则表达式:https://regex101.com/

   '''

   # hostname 列表,包含多个主机 IP 地址

   # hostnames = ['127.0.0.1', '127.0.0.1', '127.0.0.1']  # 这是一个示例列表

   hostnames = ['127.0.0.1']  # 这是一个示例列表

   # hostnames = ['127.0.0.1', '127.0.0.1']  # 这是一个示例列表

   username = 'huawei'

   password = 'huawei'


   # 遍历每个 hostname,依次进行 Telnet 连接和命令执行

   for hostname in hostnames:

       print(f"正在连接 {hostname} ...")


       # 连接 Telnet

       tn = connect_telnet(hostname, username, password)


       # 如果连接成功,执行命令

       if tn:

           print(f"连接成功 {hostname}, 执行命令中...")


           # 执行 show ip ospf neighbor detail 命令 ,信息为excel形式,每个主机只有一行,做一个表格合并就行files_to_one(),不需要做可视化了

           # execute_ip_ospf_neighbor_detail(tn, hostname)


           # 可以根据需要取消注释执行其他命令

           # 返回的数据为二层接口的状态

           # execute_interface_brief(tn, hostname)



           # execute_ip_interface_brief(tn, hostname)

           execute_show_interface_xgei(tn, hostname)

           # excute_ping(tn,hostname)


           # 这三个可参考excute_ping()方法进行处理

           # execute_show_opentical(tn, hostname)

           # excute_show_runningconfig(tn,hostname)

           # excute_show_alarm_current(tn,hostname)


           # 关闭 Telnet 连接

           tn.close()

           print(f"{hostname} 的操作已完成,连接关闭。\n")

       else:

           print(f"无法连接到 {hostname},请检查连接或主机状态。\n")




   # 合并文件

   # files_to_one()



####################

import re

import time

import pandas as pd

import matplotlib.pyplot as plt

import seaborn as sns

import scipy.stats as stats

from datetime import datetime

import telnetlib

import os


# 采集接口流量数据并保存到CSV文件

def execute_show_interface_xgei(tn, hostname, interval=5, num_samples=10):

   all_data = []  # 用于存储每次采集的数据


   for i in range(num_samples):

       command = bytes("show interface xgei-0/2/0/2", encoding='utf-8')

       tn.write(command + b'\r\n')


       result_list = []

       # 持续读取结果

       while True:

           command_result = tn.read_very_eager().decode('ascii')

           result_list.append(command_result)

           if re.findall(r"--More--", command_result.strip()):

               tn.write(b" ")

           elif re.findall(r"#", command_result.strip()):

               break

           else:

               time.sleep(1)

               continue


       result_str = "\n".join(result_list)


       # 将结果存储为字典

       dict_output = {}

       dict_output["time"] = datetime.now()

       dict_output["host_ip"] = hostname

       dict_output['interface'] = re.search(r'\w+gei-\d\/\d\/\d\/\d+', result_str).group(0)

       dict_output['In_Bytes'] = re.search(r'In_Bytes\s+(\d+)', result_str).group(1)

       dict_output['E_Bytes'] = re.search(r'E_Bytes\s+(\d+)', result_str).group(1)


       # 将数据添加到总列表中

       all_data.append(dict_output)


       # 每隔 interval 秒采集一次

       time.sleep(interval)


   # 保存结果为 CSV 文件

   pd_output = pd.DataFrame.from_dict(all_data)

   # 确保文件夹存在

   # if not os.path.exists('./data'):

   #     os.makedirs('./data')

   # pd_output.to_csv('./show_interface_xgei.csv', mode='a', index=None, encoding='gb18030')

   pd_output.to_csv('./show_interface_xgei.csv', mode='a', index=None, encoding='gb18030')



# 从CSV文件中读取数据并进行图形化展示

def plot_traffic_data(csv_file='./show_interface_xgei.csv'):

   try:

       # 从CSV文件中读取数据

       data = pd.read_csv(csv_file)


       # 转换时间列为datetime类型

       # data['time'] = pd.to_datetime(data['time'], format='%Y-%m-%d %H:%M:%S')

       data['time'] = pd.to_datetime(data['time'], format='mixed')

       data.sort_values('time', inplace=True)  # 按时间排序

       data.reset_index(level=None, drop=True, inplace=True)


       # 计算字节差分值

       data['In_Bytes'] = data['In_Bytes'].diff(1)

       data['E_Bytes'] = data['E_Bytes'].diff(1)


       # 删除含有缺失值的行

       data.dropna(axis=0, how='any', inplace=True)


   except Exception as e:

       print(e)

       return


   # 绘制流量变化趋势图

   plt.figure(figsize=(10, 6))

   plt.plot(data['time'], data['In_Bytes'], label='In_Bytes', marker='o')

   plt.plot(data['time'], data['E_Bytes'], label='E_Bytes', marker='o')

   plt.title('Interface Traffic Over Time')

   plt.xlabel('Time')

   plt.ylabel('Bytes')

   plt.legend()

   plt.grid(True)

   plt.show()


   # 正态分布图和统计分析

   for i in ['In_Bytes', 'E_Bytes']:

       X = data[i].astype(float)


       # 绘制直方图

       plt.figure(figsize=(10, 6))

       plt.title(i + ' Histogram')

       plt.hist(X, bins=50)

       plt.show()


       # 绘制概率密度图

       plt.figure(figsize=(10, 6))

       plt.title(i + ' Probability Density')

       sns.kdeplot(X, kernel='gau', color="g", alpha=.7)

       plt.show()


       # 偏度和峰度计算

       print(f"{i} Skewness:", stats.skew(X))

       print(f"{i} Kurtosis:", stats.kurtosis(X))


   # 计算协方差和相关系数

   covariance = data['In_Bytes'].cov(data['E_Bytes'])

   correlation = data['In_Bytes'].corr(data['E_Bytes'])


   print("Covariance:", covariance)

   print("Correlation:", correlation)



if __name__ == '__main__':

   # 示例Telnet连接信息

   hostname = '127.0.0.1'

   username = 'huawei'

   password = 'huawei'


   try:

       # 连接Telnet

       tn = telnetlib.Telnet(hostname)

       tn.read_until(b'Username:')

       tn.write(username.encode('ascii') + b'\n')

       tn.read_until(b'Password:')

       tn.write(password.encode('ascii') + b'\n')


       # 执行采集命令,每隔5秒采集一次,连续采集10次

       # while True: 如果注释打开,记得缩进两个空格

       execute_show_interface_xgei(tn, hostname, interval=5, num_samples=10)

       tn.close()


       # 绘制流量图形化展示

       # plot_traffic_data()


   except Exception as e:

       print(f"Error: {e}")

举报

相关推荐

0 条评论