import glob
import telnetlib
import re
from datetime import datetime
from time import sleep
import pandas as pd
import os
import time
import matplotlib
matplotlib.use('Agg') # Non-GUI backend
from matplotlib import pyplot as plt
# Telnet 连接函数
def connect_telnet(hostname, username, password):
try:
tn = telnetlib.Telnet()
tn.open(hostname, port=23, timeout=5) # 尝试连接 Telnet
print("connected......", hostname)
# 输入用户名
tn.read_until(b'Username:', timeout=5)
tn.write(username.encode('ascii') + b'\n')
# 输入密码
tn.read_until(b'Password:', timeout=5)
tn.write(password.encode('ascii') + b'\n')
# 检查是否成功登录
login_result = tn.read_until(b'#', timeout=5)
if b'#' not in login_result:
print('登录失败!', hostname)
tn.close() # 登录失败,关闭连接
return None
else:
print('登录成功!', hostname)
return tn # 登录成功,返回 Telnet 对象
except Exception as e:
print(f"连接失败: {e}")
return None # 连接失败,返回 None
# 执行命令的函数:执行 Telnet 下的相关命令并返回结果
def execute_ip_ospf_neighbor_detail(tn, hostname):
# 执行第一个命令:显示当前系统时间
command1 = bytes("show clock", encoding='utf-8')
tn.write(command1 + b'\r\n') # 发送命令并执行
command1_result = tn.read_until(b'#') # 读取直到命令提示符结束的结果
command1_result = re.findall(r"\d+:\d+:\d+\s+\w+\s+\w+\s+\w+\s+\d+\s+2004", command1_result.decode('GB18030'))[0]
# 执行第二个命令:显示 OSPF 邻居详情
command2 = bytes("show ip ospf neighbor detail", encoding='utf-8')
tn.write(command2 + b'\r\n') # 发送命令
result_list = []
# 持续读取 OSPF 命令结果,直到读取完成
while True:
command2_result = tn.read_very_eager().decode('ascii')
result_list.append(command2_result)
if re.findall(r"--More--", command2_result.strip()): # 如果命令输出有 "--More--" 字样,按空格获取下一页
tn.write(b" ")
elif re.findall(r"#", command2_result.strip()): # 如果命令输出结束,退出循环
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list) # 将结果列表拼接成一个字符串
# 将命令执行的结果存储为字典
dict_output = {}
dict_output["host_ip"] = hostname
dict_output["time"] = command1_result # 存储命令 1(系统时间)结果
# 使用正则表达式提取 OSPF 邻居详细信息
dict_output["OSPF Router with ID"] = re.search(r'OSPF Router with ID (.+)', result_str).group(1)
dict_output["Neighbor"] = re.search(r'Neighbor\s+(\d+.\d+.\d+.\d+)', result_str).group(1)
dict_output["area"] = re.search(r'In the area\s+(.+)', result_str).group(1)
dict_output["State"] = re.search(r'State\s+(\w+), ', result_str).group(1)
# 将字典数据转换为 DataFrame 并保存为 CSV 文件
pd_output = pd.DataFrame.from_dict([dict_output])
pd_output['time'] = pd.to_datetime(pd_output['time'])
pd_output['time'] = pd_output['time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
ttt = str(datetime.now())
# pd_output.to_csv('./' + os.sep + r'ospf_neighbor_detail' + '-' + 'hostname' + '.csv', mode='a', index=None, encoding='gb18030')
pd_output.to_csv('./' + os.sep + r'ospf_neighbor_detail' + '-' + 'hostname' + ttt +'.csv', index=None, encoding='gb18030')
# 示例调用:执行 "show interface brief" 命令
def execute_interface_brief(tn, hostname):
command = bytes("show interface brief", encoding='utf-8')
tn.write(command + b'\r\n')
result_list = []
# 持续读取命令结果,处理 "--More--" 分页输出
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list)
list_str = result_str.split('\n') # 将结果按行分割成列表
list_temperature_vec = []
# 遍历结果行,查找符合正则表达式的内容
for j in list_str:
regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
if len(re.findall(r"Interface", j)) > 0:
new_columns = list_find_str = re.split(r'\s+', j) # 提取列名
new_columns = new_columns[0:8]
if len(regex.findall(j)) > 0:
list_find_str = regex.findall(j)[0]
list_find_str = re.split(r'\s+', list_find_str) # 提取数据行
list_temperature_vec.append(list_find_str)
pd_result = pd.DataFrame(list_temperature_vec)
pd_result.columns = new_columns # 设置 DataFrame 列名
pd_result.to_csv('./' + os.sep + r'interface_brief' + '.csv', encoding='gb18030') # 保存结果
return pd_result # 返回结果 DataFrame
# 示例调用:光功率1:执行 "show opticalinfo brief" 命令
def execute_opticalinfo_brief(tn, hostname):
command = bytes("show opticalinfo brief", encoding='utf-8')
tn.write(command + b'\r\n')
result_list = []
# 持续读取命令结果,处理 "--More--" 分页输出
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list)
list_str = result_str.split('\n') # 将结果按行分割成列表
list_temperature_vec = []
# 遍历结果行,查找符合正则表达式的内容
for j in list_str:
regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
if len(re.findall(r"Interface", j)) > 0:
new_columns = list_find_str = re.split(r'\s+', j) # 提取列名
new_columns = new_columns[0:6]
if len(regex.findall(j)) > 0:
list_find_str = regex.findall(j)[0]
list_find_str = re.split(r'\s+', list_find_str) # 提取数据行
list_temperature_vec.append(list_find_str)
pd_result = pd.DataFrame(list_temperature_vec)
pd_result.columns = new_columns # 设置 DataFrame 列名
pd_result.to_csv('./' + os.sep + r'interface_brief' + '-' + str(hostname) + str(datetime.now()) + '.csv',
encoding='gb18030') # 保存结果
return pd_result # 返回结果 DataFrame
# 示例调用:光功率2:执行 "show opticalinfo xgei-0/3/0/2" 命令
# 这个命令还得现场看下,可以参考show interface xgei-0/2/0/13
# def execute_opticalinfo_xgei(tn, hostname):
# 示例调用:执行 "show ip interface brief" 命令,类似上面的函数
def execute_ip_interface_brief(tn, hostname):
command = bytes("show ip interface brief", encoding='utf-8')
tn.write(command + b'\r\n')
result_list = []
# 持续读取结果
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list)
list_str = result_str.split('\n')
list_temperature_vec = []
for j in list_str:
if len(re.findall(r"Interface", j)) > 0:
new_columns = re.split(r'\s+', j) # 提取列名
print("new_columns111111111:",new_columns)
new_columns = new_columns[0:6]
regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
print(regex)
if len(regex.findall(j)) > 0:
list_find_str = regex.findall(j)[0]
list_find_str = re.split(r'\s+', list_find_str)
list_temperature_vec.append(list_find_str)
pd_result = pd.DataFrame(list_temperature_vec)
pd_result.columns = new_columns # 设置列名
pd_result.to_csv('./' + os.sep + r'ip_interface_brief' + '-' + str(hostname) + '.csv', encoding='gb18030') # 保存结果
return pd_result
# 示例调用:执行 "show interface xgei-0/2/0/13" 命令,获取接口的详细信息
def execute_show_interface_xgei(tn, hostname):
command = bytes("show interface xgei-0/2/0/2", encoding='utf-8')
tn.write(command + b'\r\n')
result_list = []
# 持续读取结果
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list)
# 将结果存储为字典
dict_output = {}
dict_output["time"] = datetime.now()
dict_output["host_ip"] = hostname
dict_output['interface'] = re.search(r'\w+gei-\d\/\d\/\d\/\d+', result_str).group(0)
dict_output['In_Bytes'] = re.search(r'In_Bytes\s+(\d+)', result_str).group(1)
dict_output['E_Bytes'] = re.search(r'E_Bytes\s+(\d+)', result_str).group(1)
# 保存结果为 CSV 文件
pd_output = pd.DataFrame.from_dict([dict_output])
# pd_output.to_csv('./' + os.sep + r'show_interface_xgei' + '.csv', mode='a', index=None, encoding='gb18030')
pd_output.to_csv('./' + os.sep + r'show_interface_xgei' + '.csv', index=None, encoding='gb18030')
def excute_ping(tn, hostname):
# Execute the ping command
command = bytes("ping 129.201.0.1", encoding='utf-8')
tn.write(command + b'\r\n')
time.sleep(5)
result_list = []
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
# Combine the results into a single string
result_str = "\n".join(result_list)
# Extract success rate and number of successful pings
success_rate_match = re.search(r'Success rate is (\d+) percent', result_str)
success_rate = int(success_rate_match.group(1)) if success_rate_match else 0
total_pings_match = re.search(r'\((\d+)/\d+\)', result_str)
successful_pings = int(total_pings_match.group(1)) if total_pings_match else 0
# Create a dictionary with the parsed information
dict_output = {
"host_ip": hostname,
"success_rate": success_rate,
"successful_pings": successful_pings,
"ping_result": result_str
}
# Convert the dictionary to a DataFrame
pd_output = pd.DataFrame([dict_output])
# Save structured data to a CSV file
output_filename = f'ping_results-{hostname}-{datetime.now().strftime("%Y%m%d-%H%M%S")}.csv'
pd_output.to_csv(output_filename, mode='a', index=False, encoding='utf-8')
print(f"Ping result saved to {output_filename}")
return pd_output
def visualize_ping_results(ping_df):
# Sort DataFrame by success rate
ping_df = ping_df.sort_values(by='success_rate', ascending=False)
# Plot the success rates using a bar chart
plt.figure(figsize=(10, 6))
plt.barh(ping_df['host_ip'], ping_df['success_rate'], color='steelblue')
# Add titles and labels
plt.xlabel('Success Rate (%)')
plt.ylabel('Host IP')
plt.title('Ping Success Rates per Host')
plt.xlim(0, 100) # Since success rate is between 0 and 100 percent
# Display the chart
plt.savefig('plot3333.png')
def files_to_one():
file_list = glob.glob(os.path.join(os.path.abspath('./'), r'ospf*.csv'))
pd_data_o = pd.DataFrame()
for file in file_list:
try:
pd_sheet = pd.read_csv(file, encoding='gb18030', doublequote=False,
converters={u'code': str}, engine="python")
except:
print('读取异常')
pd_data_o = pd.concat([pd_data_o, pd_sheet], axis=0)
pd_data_o.to_csv('./' + os.sep + r'ospf' + '.csv', index=None, encoding='gb18030')
if __name__ == '__main__':
hostnames = ['129.201.0.1', '127.0.0.1'] # Add your host IPs here
username = 'huawei'
password = 'huawei'
# DataFrame to collect all ping results
all_ping_results = pd.DataFrame()
# Execute ping for each host and collect results
for hostname in hostnames:
tn = connect_telnet(hostname, username, password)
if tn:
ping_result = excute_ping(tn, hostname)
all_ping_results = pd.concat([all_ping_results, ping_result], ignore_index=True)
tn.close()
# Visualize the ping success rates
visualize_ping_results(all_ping_results)