0
点赞
收藏
分享

微信扫一扫

python icmp\dns\http监控网络各个节点状态,并记录日志

配置文件如下:支持多节点:

{
"dns":[{"domainname":"www.baidu.com","dnsserver":"114.114.114.114"}],
"icmp":[{"ip_address":"114.114.114.114"},{"ip_address":"172.17.1.1"},{"ip_address":"192.168.101.1"},{"ip_address":"192.168.9.1"}],
"http":[{"url":"www.baidu.com","title":"百度"}]
}

# 读取配置文件,配置文件,每个监控类型允许多个监控节点,函数返回包含多个节点的列表
def read_config():
try:
file = open("config.json")
config = file.read()
file.close()
config = json.loads(config)

http = config["http"]
icmp = config["icmp"]
dns = config["dns"]

return http,icmp,dns
except Exception:
print("read config error")
write_logs("read config error")
return

# 用来写result结果,只有网络异常的时候才会输出
def write_resules(filename,results):
try :
file = open(filename,"a")
file.write(time.asctime(time.localtime(time.time())) + " : " + results + "\n")
file.close()
except Exception:
print(time.asctime(time.localtime(time.time())) + "write results is error")

# 写日志文件,方便在出现错误的时候判断错误原因,只有在出错的时候才会输出
def write_logs(logs):
try :
file = open("logs.txt","a")
file.write(time.asctime(time.localtime(time.time())) + " : " + logs + "\n")
file.close()
except Exception:
print(time.asctime(time.localtime(time.time())) + "write log is error")

# 调用requests发送数据包
def send_http_packet(url):
# requests.packages.urllib3.disable_warnings()
user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36"
headers = {'User-Agent': user_agent}
url = "http://" + url
try:
response = requests.get(url, headers)
response_html = response.content.decode()
return response_html
except Exception:
print('send http packet error,error')
write_logs('send http packet error,error')
return

# 使用bs4解析获取到的数据,并和给出的网页标题对比
def check_http(url,title):
response_html = send_http_packet(url)
if response_html != None and title != None :
soup = bs4.BeautifulSoup(response_html, 'lxml')
html_title = soup.title.text
if title in html_title:
return True
else:
write_logs("title not same ,web title is : " + html_title)
return False
else:
print('html or title is None')
return

# 发送数ping数据包
def send_icmp_packet(ip_address):
try:
response_packet = sr1(IP(dst=ip_address)/ICMP(),timeout=2,verbose = False)
return response_packet
except Exception:
print('send icmp packet error,error')
write_logs('send icmp packet error,error')
return

# 检查icmp数据包有没有正常恢复
def check_icmp(ip_address):
response_packet = send_icmp_packet(ip_address)
if response_packet != None:
return True
else:
return

# 发送DNS请求
def send_dns_packet(domainname,dnsserver):
i = IP(dst=dnsserver)
u = UDP(dport = 53)
d = DNS(rd=1)
d.qd = DNSQR(qname = domainname , qtype = "A" ,qclass = "IN")
dns_request_packet = i/u/d

try:
dns_response_packet = sr1(dns_request_packet,timeout=2,verbose = False)
return dns_response_packet
except Exception:
print("send dns packet error ")
write_logs('send icmp packet error,error')
return

# 检查DNS请求是否正常返回
def check_dns(domainname,dnsserver):
dns_response_packet = send_dns_packet(domainname,dnsserver)

if dns_response_packet != None:
return True
else:
return

# 为了方便多线程调用,每个监控类型都设置一个启动程序
def start_http(url, title):
while True:
if check_http(url, title) :
print("http True : " + url + "," + title)
else:
print("http False : " + url + "," + title)
write_resules("http_results.txt","http False : " + url + "," + title)
time.sleep(2)

# 为了方便多线程调用,每个监控类型都设置一个启动程序
def start_icmp(ip_address):
while True:
if check_icmp(ip_address) :
print("icmp True : " + ip_address)
else:
print("icmp False : " + ip_address)
write_resules("icmp_results.txt","icmp False : " + ip_address)
time.sleep(2)

# 为了方便多线程调用,每个监控类型都设置一个启动程序
def start_dns(domainname,dnsserver):
while True:
if check_dns(domainname,dnsserver) :
print("dns True : " + domainname + "," + dnsserver)
else:
print("dns False : " + domainname + "," + dnsserver)
write_resules("dns_results.txt","dns False : " + domainname + "," +dnsserver)
time.sleep(2)

# 主函数
def main():
http,icmp,dns= read_config()

# 循环取出http监控节点,每个节点启动一个线程
for h in http:
url = h["url"]
title = h["title"]
moniter_http = threading.Thread(target=start_http,args=(url, title))
moniter_http.start()

# 循环取出icmp监控节点,每个节点启动一个线程
for i in icmp:
ip_address = i["ip_address"]
moniter_icmp = threading.Thread(target=start_icmp,args=(ip_address,))
moniter_icmp.start()

# 循环取出dns监控节点,每个节点启动一个线程
for d in dns:
domainname = d["domainname"]
dnsserver = d["dnsserver"]
moniter_dns = threading.Thread(target=start_dns,args=(domainname,dnsserver))
moniter_dns.start()

if __name__ == '__main__':
main()

 



举报

相关推荐

0 条评论