0
点赞
收藏
分享

微信扫一扫

使用python对交换机进行ping可视化

Ad大成 2024-10-11 阅读 12

import glob

import telnetlib

import re

from datetime import datetime

from time import sleep


import pandas as pd

import os

import time

import matplotlib

matplotlib.use('Agg')  # Non-GUI backend

from matplotlib import pyplot as plt





# Telnet 连接函数

def connect_telnet(hostname, username, password):

   try:

       tn = telnetlib.Telnet()

       tn.open(hostname, port=23, timeout=5)  # 尝试连接 Telnet

       print("connected......", hostname)


       # 输入用户名

       tn.read_until(b'Username:', timeout=5)

       tn.write(username.encode('ascii') + b'\n')


       # 输入密码

       tn.read_until(b'Password:', timeout=5)

       tn.write(password.encode('ascii') + b'\n')


       # 检查是否成功登录

       login_result = tn.read_until(b'#', timeout=5)

       if b'#' not in login_result:

           print('登录失败!', hostname)

           tn.close()  # 登录失败,关闭连接

           return None

       else:

           print('登录成功!', hostname)

           return tn  # 登录成功,返回 Telnet 对象

   except Exception as e:

       print(f"连接失败: {e}")

       return None  # 连接失败,返回 None



# 执行命令的函数:执行 Telnet 下的相关命令并返回结果

def execute_ip_ospf_neighbor_detail(tn, hostname):

   # 执行第一个命令:显示当前系统时间

   command1 = bytes("show clock", encoding='utf-8')

   tn.write(command1 + b'\r\n')  # 发送命令并执行

   command1_result = tn.read_until(b'#')  # 读取直到命令提示符结束的结果

   command1_result = re.findall(r"\d+:\d+:\d+\s+\w+\s+\w+\s+\w+\s+\d+\s+2004", command1_result.decode('GB18030'))[0]


   # 执行第二个命令:显示 OSPF 邻居详情

   command2 = bytes("show ip ospf neighbor detail", encoding='utf-8')

   tn.write(command2 + b'\r\n')  # 发送命令


   result_list = []

   # 持续读取 OSPF 命令结果,直到读取完成

   while True:

       command2_result = tn.read_very_eager().decode('ascii')

       result_list.append(command2_result)

       if re.findall(r"--More--", command2_result.strip()):  # 如果命令输出有 "--More--" 字样,按空格获取下一页

           tn.write(b" ")

       elif re.findall(r"#", command2_result.strip()):  # 如果命令输出结束,退出循环

           break

       else:

           time.sleep(1)

           continue


   result_str = "\n".join(result_list)  # 将结果列表拼接成一个字符串


   # 将命令执行的结果存储为字典

   dict_output = {}

   dict_output["host_ip"] = hostname

   dict_output["time"] = command1_result  # 存储命令 1(系统时间)结果


   # 使用正则表达式提取 OSPF 邻居详细信息

   dict_output["OSPF Router with ID"] = re.search(r'OSPF Router with ID (.+)', result_str).group(1)

   dict_output["Neighbor"] = re.search(r'Neighbor\s+(\d+.\d+.\d+.\d+)', result_str).group(1)

   dict_output["area"] = re.search(r'In the area\s+(.+)', result_str).group(1)

   dict_output["State"] = re.search(r'State\s+(\w+), ', result_str).group(1)


   # 将字典数据转换为 DataFrame 并保存为 CSV 文件

   pd_output = pd.DataFrame.from_dict([dict_output])

   pd_output['time'] = pd.to_datetime(pd_output['time'])

   pd_output['time'] = pd_output['time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))


   ttt = str(datetime.now())

   # pd_output.to_csv('./' + os.sep + r'ospf_neighbor_detail' + '-' + 'hostname' + '.csv', mode='a', index=None, encoding='gb18030')

   pd_output.to_csv('./' + os.sep + r'ospf_neighbor_detail' + '-' + 'hostname' + ttt +'.csv', index=None, encoding='gb18030')


# 示例调用:执行 "show interface brief" 命令

def execute_interface_brief(tn, hostname):

   command = bytes("show interface brief", encoding='utf-8')

   tn.write(command + b'\r\n')


   result_list = []

   # 持续读取命令结果,处理 "--More--" 分页输出

   while True:

       command_result = tn.read_very_eager().decode('ascii')

       result_list.append(command_result)

       if re.findall(r"--More--", command_result.strip()):

           tn.write(b" ")

       elif re.findall(r"#", command_result.strip()):

           break

       else:

           time.sleep(1)

           continue


   result_str = "\n".join(result_list)

   list_str = result_str.split('\n')  # 将结果按行分割成列表


   list_temperature_vec = []

   # 遍历结果行,查找符合正则表达式的内容

   for j in list_str:

       regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)

       if len(re.findall(r"Interface", j)) > 0:

           new_columns = list_find_str = re.split(r'\s+', j)  # 提取列名

           new_columns = new_columns[0:8]


       if len(regex.findall(j)) > 0:

           list_find_str = regex.findall(j)[0]

           list_find_str = re.split(r'\s+', list_find_str)  # 提取数据行

           list_temperature_vec.append(list_find_str)


   pd_result = pd.DataFrame(list_temperature_vec)

   pd_result.columns = new_columns  # 设置 DataFrame 列名

   pd_result.to_csv('./' + os.sep + r'interface_brief' + '.csv', encoding='gb18030')  # 保存结果

   return pd_result  # 返回结果 DataFrame



# 示例调用:光功率1:执行 "show opticalinfo brief" 命令

def execute_opticalinfo_brief(tn, hostname):

   command = bytes("show opticalinfo brief", encoding='utf-8')

   tn.write(command + b'\r\n')


   result_list = []

   # 持续读取命令结果,处理 "--More--" 分页输出

   while True:

       command_result = tn.read_very_eager().decode('ascii')

       result_list.append(command_result)

       if re.findall(r"--More--", command_result.strip()):

           tn.write(b" ")

       elif re.findall(r"#", command_result.strip()):

           break

       else:

           time.sleep(1)

           continue


   result_str = "\n".join(result_list)

   list_str = result_str.split('\n')  # 将结果按行分割成列表


   list_temperature_vec = []

   # 遍历结果行,查找符合正则表达式的内容

   for j in list_str:

       regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)

       if len(re.findall(r"Interface", j)) > 0:

           new_columns = list_find_str = re.split(r'\s+', j)  # 提取列名

           new_columns = new_columns[0:6]


       if len(regex.findall(j)) > 0:

           list_find_str = regex.findall(j)[0]

           list_find_str = re.split(r'\s+', list_find_str)  # 提取数据行

           list_temperature_vec.append(list_find_str)


   pd_result = pd.DataFrame(list_temperature_vec)

   pd_result.columns = new_columns  # 设置 DataFrame 列名

   pd_result.to_csv('./' + os.sep + r'interface_brief' + '-' + str(hostname) + str(datetime.now()) + '.csv',

                    encoding='gb18030')  # 保存结果

   return pd_result  # 返回结果 DataFrame



# 示例调用:光功率2:执行 "show opticalinfo xgei-0/3/0/2" 命令

# 这个命令还得现场看下,可以参考show interface xgei-0/2/0/13

# def execute_opticalinfo_xgei(tn, hostname):




# 示例调用:执行 "show ip interface brief" 命令,类似上面的函数

def execute_ip_interface_brief(tn, hostname):

   command = bytes("show ip interface brief", encoding='utf-8')

   tn.write(command + b'\r\n')


   result_list = []

   # 持续读取结果

   while True:

       command_result = tn.read_very_eager().decode('ascii')

       result_list.append(command_result)

       if re.findall(r"--More--", command_result.strip()):

           tn.write(b" ")

       elif re.findall(r"#", command_result.strip()):

           break

       else:

           time.sleep(1)

           continue


   result_str = "\n".join(result_list)

   list_str = result_str.split('\n')


   list_temperature_vec = []

   for j in list_str:

       if len(re.findall(r"Interface", j)) > 0:

           new_columns = re.split(r'\s+', j)  # 提取列名

           print("new_columns111111111:",new_columns)

           new_columns = new_columns[0:6]


       regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)

       print(regex)

       if len(regex.findall(j)) > 0:

           list_find_str = regex.findall(j)[0]

           list_find_str = re.split(r'\s+', list_find_str)

           list_temperature_vec.append(list_find_str)


   pd_result = pd.DataFrame(list_temperature_vec)

   pd_result.columns = new_columns  # 设置列名

   pd_result.to_csv('./' + os.sep + r'ip_interface_brief' + '-' + str(hostname) + '.csv', encoding='gb18030')  # 保存结果

   return pd_result



# 示例调用:执行 "show interface xgei-0/2/0/13" 命令,获取接口的详细信息

def execute_show_interface_xgei(tn, hostname):

   command = bytes("show interface xgei-0/2/0/2", encoding='utf-8')

   tn.write(command + b'\r\n')


   result_list = []

   # 持续读取结果

   while True:

       command_result = tn.read_very_eager().decode('ascii')

       result_list.append(command_result)

       if re.findall(r"--More--", command_result.strip()):

           tn.write(b" ")

       elif re.findall(r"#", command_result.strip()):

           break

       else:

           time.sleep(1)

           continue


   result_str = "\n".join(result_list)


   # 将结果存储为字典

   dict_output = {}

   dict_output["time"] = datetime.now()

   dict_output["host_ip"] = hostname

   dict_output['interface'] = re.search(r'\w+gei-\d\/\d\/\d\/\d+', result_str).group(0)

   dict_output['In_Bytes'] = re.search(r'In_Bytes\s+(\d+)', result_str).group(1)

   dict_output['E_Bytes'] = re.search(r'E_Bytes\s+(\d+)', result_str).group(1)


   # 保存结果为 CSV 文件

   pd_output = pd.DataFrame.from_dict([dict_output])

   # pd_output.to_csv('./' + os.sep + r'show_interface_xgei' + '.csv', mode='a', index=None, encoding='gb18030')

   pd_output.to_csv('./' + os.sep + r'show_interface_xgei' + '.csv', index=None, encoding='gb18030')


def excute_ping(tn, hostname):

   # Execute the ping command

   command = bytes("ping 129.201.0.1", encoding='utf-8')

   tn.write(command + b'\r\n')

   time.sleep(5)

   result_list = []


   while True:

       command_result = tn.read_very_eager().decode('ascii')

       result_list.append(command_result)

       if re.findall(r"--More--", command_result.strip()):

           tn.write(b" ")

       elif re.findall(r"#", command_result.strip()):

           break

       else:

           time.sleep(1)

           continue


   # Combine the results into a single string

   result_str = "\n".join(result_list)


   # Extract success rate and number of successful pings

   success_rate_match = re.search(r'Success rate is (\d+) percent', result_str)

   success_rate = int(success_rate_match.group(1)) if success_rate_match else 0


   total_pings_match = re.search(r'\((\d+)/\d+\)', result_str)

   successful_pings = int(total_pings_match.group(1)) if total_pings_match else 0


   # Create a dictionary with the parsed information

   dict_output = {

       "host_ip": hostname,

       "success_rate": success_rate,

       "successful_pings": successful_pings,

       "ping_result": result_str

   }


   # Convert the dictionary to a DataFrame

   pd_output = pd.DataFrame([dict_output])


   # Save structured data to a CSV file

   output_filename = f'ping_results-{hostname}-{datetime.now().strftime("%Y%m%d-%H%M%S")}.csv'

   pd_output.to_csv(output_filename, mode='a', index=False, encoding='utf-8')


   print(f"Ping result saved to {output_filename}")

   return pd_output


def visualize_ping_results(ping_df):

   # Sort DataFrame by success rate

   ping_df = ping_df.sort_values(by='success_rate', ascending=False)


   # Plot the success rates using a bar chart

   plt.figure(figsize=(10, 6))

   plt.barh(ping_df['host_ip'], ping_df['success_rate'], color='steelblue')


   # Add titles and labels

   plt.xlabel('Success Rate (%)')

   plt.ylabel('Host IP')

   plt.title('Ping Success Rates per Host')

   plt.xlim(0, 100)  # Since success rate is between 0 and 100 percent


   # Display the chart

   plt.savefig('plot3333.png')


def files_to_one():



   file_list = glob.glob(os.path.join(os.path.abspath('./'), r'ospf*.csv'))

   pd_data_o = pd.DataFrame()

   for file in file_list:

       try:

           pd_sheet = pd.read_csv(file, encoding='gb18030', doublequote=False,

                                  converters={u'code': str}, engine="python")

       except:

           print('读取异常')

       pd_data_o = pd.concat([pd_data_o, pd_sheet], axis=0)

       pd_data_o.to_csv('./' + os.sep + r'ospf' + '.csv', index=None, encoding='gb18030')



if __name__ == '__main__':

   hostnames = ['129.201.0.1', '127.0.0.1']  # Add your host IPs here

   username = 'huawei'

   password = 'huawei'


   # DataFrame to collect all ping results

   all_ping_results = pd.DataFrame()


   # Execute ping for each host and collect results

   for hostname in hostnames:

       tn = connect_telnet(hostname, username, password)

       if tn:

           ping_result = excute_ping(tn, hostname)

           all_ping_results = pd.concat([all_ping_results, ping_result], ignore_index=True)

           tn.close()


   # Visualize the ping success rates

   visualize_ping_results(all_ping_results)



举报

相关推荐

0 条评论