0
点赞
收藏
分享

微信扫一扫

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️


大家好,我是​​


​​😎

前面我写了篇水文《​​获取当前局域网下所有连接设备的ip地址和mac地址​​》,但是没有想到的是居然上了热榜,也是我个人第一篇上热榜的文章,阅读量瞬间飙升💥。然而我的硬核技术文却几乎没有人看到。既然又很多人对这个话题感兴趣,那么我们就继续对相关原理深挖,最好能自己实现,理解透彻。

首先我们回顾一下前文,在前文中我介绍了windows下获取ip地址和arp映射表的命令,通过分析最新arp映射表知道当前网段下在线或下线的设备⭐。

文章使用的技术是通过python调用系统ping命令,实现arp表的更新。然而系统自带的ping命令访问整个网段的ip时,耗时达到了2分钟,后面通过多线程加速,最终也只能提速到最快25秒。这个速度实在延时过大,无法应用于更高级的应用😇。

今天我们的目标是就是将Ping整个网段IP的总耗时降低到5秒以内,这样我们就能够在5秒内知道指定mac地址设备的上下线,例如开发一个BOSS来了的摸鱼神器,只要老板的手机一连上wifi,这边在5秒内收到通知,立马停止摸鱼,就保证了平时放心大胆的摸鱼⚡。

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_icmp

那么如何提速呢?经过我几天的苦思冥想,并在学习了一些网络知识后,自己实现了PING命令,成功的实现了放心大胆的摸鱼。于是,在我看了几本书,写了几千行代码,踩了几百个坑后,终于把相关知识理解透了。下面是我将涉及到的核心知识点总结成了这篇文章,所以这篇文章都是非常精简的干货,强烈❤️建议收藏❤️。

学完本文,你的力量将不仅仅止于此,还能够底层化开发任何基于IP协议的自定义协议,当然这要看你自己是否具有举一反三的能力。甚至你还能继续自己深挖,去研究开发比IP协议更底层的协议。

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_python_02

渴望吗?渴望那就学起来吧⁉️下面是本文的知识点目录:

文章目录

  • ​​🎥socket套接字核心知识​​
  • ​​📚socket简介🔥​​
  • ​​📹 socket链接🔥​​
  • ​​💾 TCP与UDP通信模型🔥​​
  • ​​🎏SOCK_RAW原始套接字🔥​​
  • ​​🔏 socket模块和对象的其他常用方法🔥​​
  • ​​📥struct二进制数据的转换🔥​​
  • ​​📇 Ping 的工作原理​​
  • ​​📂ICMP 报文格式🔥​​
  • ​​🍋ICMP查询报文类型🔥​​
  • ​​📺ICMP差错报文类型🔥​​
  • ​​📌socket原始套接字实现PING命令​​
  • ​​🌎发送回送请求🔥​​
  • ​​🌌接收回送响应🔥​​
  • ​​🎉完善ping命令的开发🔥​​
  • ​​🔔借助arp表获取当前网段在线设备🔥​​
  • ​​📇 双线程获取指定网段的在线设备🔥​​
  • ​​📟完成BOSS来了的摸鱼神器🔥​​
  • ​​☀️总结​​

🎥socket套接字核心知识

📚socket简介🔥

​进程间通信​​指运行的程序之间的数据共享,在1台电脑上可以通过进程号(PID)来唯一标识一个进程进行通信。

在网络中,TCP/IP协议族网络层的“ip地址”可以唯一标识网络中的主机,而传输层的“协议+端口”可以唯一标识主机中的应用进程(进程)。网络中的进程通信就可以通过​​ip地址,协议,端口​​这个标志与其它进程进行交互。

socket(简称 ​​套接字​​) 就是实现网络进程间通信的一种方式,网络上各种各样的服务大多都是基于 Socket 来完成通信的。为了建立通信通道,网络通信的每个端点拥有一个socket套接字对象,它们允许程序接受并进行连接,如发送和接受数据。

📹 socket链接🔥

在 Python 中 使用socket 模块的函数 socket 就可以完成:

import socket
socket.socket(family=-1, type=-1, proto=-1, fileno=None)

参数说明:

family为指定的地址族,主要有三种:

  • socket.AF_UNIX :用于同一台机器进程间通信
  • socket.AF_INET :基于ipv4协议的Internet 进程间通信
  • socket.AF_INET6 :基于ipv6协议的Internet 进程间通信

更多的地址族还包括,​​socket.AF_BLUETOOTH​​​蓝牙相关、​​socket.AF_VSOCK​​​虚拟机通信、​​socket.AF_PACKET​​直连网络设备底层接口等。

type为指定的套接字类型,主要有三种:

  • socket.SOCK_STREAM :流式套接字,使用面向连接的TCP协议实现字节流的传输
  • socket.SOCK_DGRAM :数据报套接字,使用面向非连接的UDP实现数据报套接字
  • socket.SOCK_RAW:原始套接字,该套接字允许对较低层协议(如 IP或 ICMP)进行直接访问

更多套接字类型还包括​​socket.SOCK_RDM​​​和​​socket.SOCK_SEQPACKET​​等。

💾 TCP与UDP通信模型🔥

对于tcp或udp套接字可以直接使用以下方式进行创建:

import socket

# 创建tcp的套接字
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 创建udp的套接字
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 不用的时候,关闭套接字
s.close()

**UDP通信模型:**在通信开始之前,不需要建立相关的链接,只需要发送数据即可,类似于​​写信​​:

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_icmp_03

UDP服务端示例代码:

from socket import *
# 创建套接字
udp_socket = socket(AF_INET, SOCK_DGRAM)
# 绑定本地的相关信息,不绑定系统会随机分配
udp_socket.bind(('0.0.0.0', 8080))
# 等待接收对方发送的数据
recv_data = udp_socket.recvfrom(1024) # 1024表示本次接收的最大字节数
# 显示接收到的数据,第1个元素是对方发送的数据,第2个元素是对方的ip和端口
print(recv_data[0].decode('u8'))
# 关闭套接字
udp_socket.close()

UDP客户端示例代码:

from socket import *
# 创建udp套接字
udp_socket = socket(AF_INET, SOCK_DGRAM)
# 发送数据到指定的电脑上的指定程序中
udp_socket.sendto("你好,服务器~".encode('u8'), ('192.168.1.103', 8080))
# 关闭套接字
udp_socket.close()

**TCP通信模型:**在通信开始之前,一定要先建立相关的链接,才能发送数据,类似于​​打电话​​:

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_python_04

TCP服务端示例代码:

from socket import *

# 创建socket
tcp_server_socket = socket(AF_INET, SOCK_STREAM)
# 服务器绑定本机ip和端口
tcp_server_socket.bind(('0.0.0.0', 8080))
# 监听端口,128表示最大同时接收128个客户端链接
tcp_server_socket.listen(128)
# 如果有新的客户端来链接服务器,那么就产生一个新的套接字专门为这个客户端服务
client_socket, clientAddr = tcp_server_socket.accept()
# 接收对方发送过来的数据
recv_data = client_socket.recv(1024) # 接收1024个字节
print('接收到的数据为:', recv_data.decode('u8'))
# 发送一些数据到客户端
client_socket.send("你好客户端!".encode('u8'))
# 关闭为这个客户端服务的套接字
client_socket.close()

TCP客户端示例代码:

from socket import *

# 创建socket
tcp_client_socket = socket(AF_INET, SOCK_STREAM)
# 链接服务器
tcp_client_socket.connect(('192.168.3.31', 8080))
tcp_client_socket.send("测试发送的内容".encode("u8"))
# 接收对方发送过来的数据,最大接收1024个字节
recvData = tcp_client_socket.recv(1024)
print('接收到的数据为:', recvData.decode('u8'))
# 关闭套接字
tcp_client_socket.close()

🎏SOCK_RAW原始套接字🔥

上述两种套接字是常规的套接字模式,第三个参数省略或为零(IP协议)会自动选择正确的协议(TCP协议和UDP协议)。

当我们指定套接字类型为​​socket.SOCK_RAW​​原始套接字时,第三个参数就需要指定proto协议号。

python的socket库预定义的协议号有:

  • socket.IPPROTO_TCP:TCP传输协议,值为6
  • socket.IPPROTO_UDP:UDP传输协议,值为17
  • socket.IPPROTO_ICMP:ICMP协议,值为1
  • socket.IPPROTO_IP:IP协议,值为0
  • socket.IPPROTO_RAW:可自行构建IP头部构建更底层的协议,值为1

也可以通过协议名称获取协议号常量:

import socket

print(socket.IPPROTO_ICMP, socket.getprotobyname("icmp"),
socket.IPPROTO_ICMP == socket.getprotobyname("icmp"))

1 1 True

可以看到两种方式获取协议号均可。

通过原始套接字我们可以使用ICMP或更底层的协议进行通讯从而实现更高级的功能。

我们需要使用ICMP协议进行网络通信就可以使用​​SOCK_RAW​​原始套接字:

icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

🔏 socket模块和对象的其他常用方法🔥

socket模块的其他常用方法:

​socket.gethostbyname​​:将主机名转换为IPv4地址格式。IPv4地址以字符串形式返回

​socket.gethostname​​:返回包含Python解释器当前正在执行的机器的主机名的字符串

​socket.gethostbyaddr​​:根据IP地址获取主机名

​socket.getprotobyname​​:将Internet协议名称转换为协议号常量

在主机字节顺序与网络字节顺序不相同的机器上,使用以下方法转换:

网络顺序转换为主机字节顺序

主机顺序转换为网络字节顺序

32位正整数

4字节的交换操作

​socket.ntohl​

​socket.htonl​

16位正整数

2字节的交换操作

​socket.ntohs​

​socket.htons​

在主机字节顺序与网络字节顺序相同的机器上,执行以上方法是无操作的。

​socket.inet_aton​​:将字符串格式的IPv4地址打包为32位4字节的字节对象

​获取本机ip地址方法1​​:先获取本机主机名,再通过主机名获取ip

import socket

ip = socket.gethostbyname(socket.gethostname())
print(ip)

192.168.3.31

获取本机所有网卡的IP:

ips = socket.gethostbyname_ex(socket.gethostname())[-1]
print(ips)

['192.168.3.31']

⚠️注意:如果本机没有正确设置主机名时可能无法获取本机ip地址。

socket套接字对象的公用函数套接字函数:

  • s.getpeername():返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)
  • s.getsockname():返回套接字自己的地址。通常是一个元组(ipaddr,port)
  • s.setsockopt(level,optname,value):设置给定套接字选项的值。
  • s.getsockopt(level,optname[.buflen]):返回套接字选项的值。
  • s.settimeout(timeout):设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect)
  • s.gettimeout():返回当前超时期的值,单位是秒,如果没有设置超时期,则返回None
  • s.fileno():返回套接字的文件描述符
  • s.setblocking(flag):如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。
  • s.makefile():创建一个与该套接字相关连的文件。

​获取本机ip地址方法2​​:向任意网络地址发送一个无状态的UDP请求后,再通过套接字对象获取自己的地址从而获取本机地址

import socket

def get_local_ip():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('1.1.1.1', 80))
ip, port = s.getsockname()
return ip
# 获取本机IP
ip = get_local_ip()
print(ip)

192.168.3.31

✅即使无法连接Internet目标地址无法访问(发出报文会丢失),也可以使用该方法获取本机ip地址。

📥struct二进制数据的转换🔥

Python提供了一个struct模块来解决bytes和其他二进制数据类型的转换。

struct的pack函数把任意数据类型变成bytes。

import struct
print(struct.pack('>I', 10240099))

b'\x00\x9c@c'

pack 的第一个参数是处理指令:

  • ​>​​:表示字节顺序是 big-endian,也就是网络序
  • ​I​​:表示 4 字节无符号整数
  • ​H​​:2 字节无符号整数。

后面的参数字节个数要和处理指令一致。
unpack 把 bytes 变成相应的数据类型:

>>> struct.unpack('>IH', b'\xf0\xf0\xf0\xf0\x80\x80')
(4042322160, 32896)

struct模块定义的数据类型可以参考Python官方文档:

​​https://docs.python.org/zh-cn/3/library/struct.html#format-characters​​

格式

C 类型

Python 类型

标准大小

注释

x

填充字节


c

char

长度为 1 的字节串

1

b

signed char

整数

1

(1), (2)

B

unsigned char

整数

1

-2

?

_Bool

bool

1

-1

h

short

整数

2

-2

H

unsigned short

整数

2

-2

i

int

整数

4

-2

I

unsigned int

整数

4

-2

l

long

整数

4

-2

L

unsigned long

整数

4

-2

q

long long

整数

8

-2

Q

unsigned long long

整数

8

-2

n

ssize_t

整数

-3

N

size_t

整数

-3

e

-6

浮点数

2

-4

f

float

浮点数

4

-4

d

double

浮点数

8

-4

s

​char []​

字节串

p

​char []​

字节串

P

​void *​

整数

-5

📇 Ping 的工作原理

ping 基于 ​​ICMP​​ 协议工作的,ICMP 全称是 Internet Control Message Protocol,也就是互联网控制报文协议。ping 发出的ICMP 报文实际上是以侦察网络状态的形式实现了控制,反馈网络状态,从而调整传输策略以此控制整个局面。

​ICMP​​​ 主要的功能包括:**确认 IP 包是否成功送达目标地址、报告发送过程中 IP 包被废弃的原因和改善网络设置等。**ICMP 协议主要负责在 ​​IP​​​ 通信中通知某个 ​​IP​​ 包未能达到目标地址的原因。

📂ICMP 报文格式🔥

Ping命令发出的ICMP 报文封装在 IP 包里面的,结构如下:

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_icmp_05

上述报文格式中,左边的IP头部分不需要太关心,因为我们使用socket的原始套接字模式会自动帮我们封装IP头部分,右边的ICMP报文才是我们需要关心的部分。

⚠️注意:相比原生的 ICMP,Ping命令发出的ICMP报文多出了标识符和序号两个字段。

对于ICMP报文的类型,有两大类:

  1. 查询报文类型:用于诊断的查询消息
  2. 差错报文类型:通知出错原因的错误消息

不过咱们使用的PING只需要使用查询报文类型中的回送应答和回送请求。

常见的 ICMP 类型包括:

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_icmp_06

🍋ICMP查询报文类型🔥

回送消息:0表示回送应答,8表示回送请求。用于进行通信的主机或路由器之间,判断所发送的数据包是否已经成功到达对端的一种消息。

​ping​​ 命令是通过ICMP协议的回送消息实现的:

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_windows_07

发送端主机向接收端主机发送一个回送请求(​​ICMP Echo Request Message​​​,类型 ​​8​​​),只要正常接收到接收端返回的回送响应(​​ICMP Echo Reply Message​​​,类型 ​​0​​),则代表发送端主机到接收端主机可达。

📺ICMP差错报文类型🔥

对于差错报文类型,在本次编码中不会用到,无需深究,简单了解一下即可。

ICMP 常见差错报文:

  • 目标不可达消息 —— 类型 为​​3​
  • 原点抑制消息 —— 类型​​4​
  • 重定向消息 —— 类型​​5​
  • 超时消息 —— 类型​​11​

目标不可达消息(Destination Unreachable Message):

IP 路由器无法将 IP 数据包发送给目标地址时,会给发送端主机返回一个目标不可达的 ICMP 消息,并在这个消息中显示不可达的具体原因,原因记录在 ICMP 包头的代码字段。

由此,根据 ICMP 不可达的具体消息,发送端主机也就可以了解此次发送不可达的具体原因

目标不可达的原因有:

  • 网络不可达代码为​​0​
  • 主机不可达代码为​​1​
  • 协议不可达代码为​​2​
  • 端口不可达代码为​​3​
  • 需要进行分片但设置了不分片位代码为​​4​

原点抑制消息(ICMP Source Quench Message):

​ICMP​​ 原点抑制消息的目是为了缓和网络拥堵的问题,当路由器向低速线路发送数据时,其发送队列的缓存变为零而无法发送出去时,可以向 IP 包的源地址发送一个 ICMP 原点抑制消息

但是收到这种 ICMP 消息的主机并不见得真的会增大 IP 包的传输间隔,还可能会引起不公平的网络通信,所以一般不被使用。

重定向消息(ICMP Redirect Message):

在路由器持有更好的路由信息时,发现发送端主机使用了​​不是最优​​的路径发送数据,那么路由器会返回一个 ICMP 重定向消息给这个主机。这个消息中包含了最合适的路由信息和源数据,发送端下次可以发给另外一个更近的路由器。

超时消息(ICMP Time Exceeded Message):

IP 包中有一个8位的字段叫做 ​​TTL​​​ (​​Time To Live​​,生存周期),它的值随着每经过一次路由器就会减 1,直到减到 0 时该 IP 包会被丢弃。

此时,IP 路由器将会发送一个ICMP超时消息给发送端主机,并通知该包已被丢弃。设置 IP 包生存周期的主要目的是为了在路由控制遇到问题发生循环状况时,避免 IP 包无休止地在网络上被转发。

也可以通过设置一个较小的 TTL 值 控制包的到达范围。

📌socket原始套接字实现PING命令

学了这么多基础的网络知识,我们最终为了什么?就是为了能够自己实现PING命令。相关的网络知识还有很多,但对于我们实现PING命令并没有太大关系,就暂不做深究。

下面我们从实战出现,一步步调试继续深挖PING命令的实现原理。

首先我们创建​​ICMP​​协议的原始套接字链接:

import socket

icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

🌎发送回送请求🔥

然后需要向目标发送一个回送请求,结构如下:

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_python_08

下面开始组织报文数据(对于系列号,我们可以自行决定要发送的值):

import os
import time
import struct

# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum, identifier, serial_num = 8, 0, 0, os.getpid() & 0xFFFF, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包选项数据,包含当前时间戳,后面用Q补齐到192位
data = struct.pack("d", time.time()).ljust(192, b"Q")

计算校验和的规则这里我已经写成代码,大家可以直接看代码:

def calc_checksum(src_bytes):
"""用于计算ICMP报文的校验和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2

if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff

total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)

⚠️注意:最终返回时通过socket.htons方法将数据从主机序转换为网络序。

然后就可以计算出校验和重新打包header:

checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)

然后就可以发送了:

# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
target_addr = "192.168.3.31"
icmp_socket.sendto(header + data, (target_addr, 1))

⚠️注意:虽然发送给了1号端口,但其实发送给任意端口都可以。

🌌接收回送响应🔥

回送响应与回送请求结构一致:

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_网络协议_09

发送完消息后,我们就可以接收回送相应:

# 接收回送请求
recv_packet, addr = icmp_socket.recvfrom(1024)
# 前20字节是ip协议的ip头
icmp_header = recv_packet[20:28]
data = recv_packet[28:]
ICMP_Echo_Reply, code, checksum, identifier, serial_num = struct.unpack(
"bbHHh", icmp_header
)
time_sent, = struct.unpack("d", data[:struct.calcsize("d")])

⚠️注意:我们接收的回送请求中包含了前20自己的IP头。

从选项数据中可解析出了这个包发送的时间(之前发出时写入的时间)。

🎉完善ping命令的开发🔥

虽然标准的PING命令是用以上协议规则实现的,但我们并不需要完全按照上述规范,例如标识符可以发送任何16位的值,序号可以从任意数值开始,选项数据192位的空间也可以用来存放任何数据。

我们在接收回送响应时需要检查包的标识符,确定是自己发出的包才接收。

最终封装出如下方法:

import struct
import time
import os
import socket
import select


def calc_checksum(src_bytes):
"""用于计算ICMP报文的校验和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2

if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff

total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)


def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包选项数据
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
icmp_socket.sendto(header + data, (target_addr, 1))


def receive_pong(icmp_socket, identifier=os.getpid() & 0xFFFF, serial_num=0, timeout=2):
icmp_socket.settimeout(timeout)
time_remaining = timeout
while True:
start_time = time.time()
# 接收回送请求
recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
time_received = time.time()
time_spent = time_received-start_time
# 前20字节是ip协议的ip头
icmp_header = recv_packet[20:28]
data = recv_packet[28:]
ICMP_Echo_Reply, code, checksum, identifier_reciver, serial_num_reciver = struct.unpack(
"bbHHh", icmp_header
)
if identifier_reciver != identifier or serial_num != serial_num_reciver:
# 不是当前自己发的包则忽略
time_remaining -= time_spent
if time_remaining <= 0:
raise socket.timeout
continue
time_sent, = struct.unpack("d", data[:struct.calcsize("d")])
return int((time_received - time_sent)*1000),

​192.168.3.31​​是我当前本机的局域网IP地址,测试一下:

icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
ip = '192.168.3.31'
sent_ping(icmp_socket, ip)
try:
delay, ip_received = receive_pong(icmp_socket, timeout=2)
print(f"延迟:{delay}ms,对方ip:{ip_received}")
except socket.timeout as e:
print("超时")

延迟:0ms,对方ip:192.168.3.31

然后再批量ping一下指定当前网段的所有IP:

def get_local_ip():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('1.1.1.1', 80))
ip, port = s.getsockname()
return ip


icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

local_ip = get_local_ip()
net_segment = local_ip[:local_ip.rfind(".")]
ips = []
for i in range(1, 255):
ip = f"{net_segment}.{i}"
sent_ping(icmp_socket, ip)
print("ping", ip, end=" ")
try:
delay, ip_received = receive_pong(icmp_socket, timeout=0.1)
print(f"延迟:{delay}ms,对方ip:{ip_received}")
ips.append(ip)
except socket.timeout as e:
print("超时")
print(ips)
icmp_socket.close()

超时时间0.1秒时,总耗时30秒:

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_python_10

超时时间设置为0.01秒时,总耗时则为2.59秒。

🔔借助arp表获取当前网段在线设备🔥

**如何尽量快的获取到当前在线的设备?**经过测试发现,被ping后,ping不通的机器,arp表能够自动删除对应的条目,那么思路1就是快速的向全网段发送回送请求不等待回送响应,然后2秒后取查arp表,即可看到最新的在线设备。

实现思路1:

import struct
import time
import os
import re
import socket
import pandas as pd


def calc_checksum(src_bytes):
"""用于计算ICMP报文的校验和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2

if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff

total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)


def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包选项数据
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
icmp_socket.sendto(header + data, (target_addr, 1))

def get_arp_ip_mac():
header = None
with os.popen("arp -a") as res:
for line in res:
line = line.strip()
if not line or line.startswith("接口"):
continue
if header is None:
header = re.split(" {2,}", line.strip())
break
df = pd.read_csv(res, sep=" {2,}",
names=header, header=0, engine='python')
return df


def ping_net_segment_all(net_segment):
with socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) as icmp_socket:
for i in range(1, 255):
ip = f"{net_segment}.{i}"
sent_ping(icmp_socket, ip)


net_segment = "192.168.3"
ping_net_segment_all(net_segment)
# 等待回送响应的到来,预计1秒之内
time.sleep(1)
# 读取最新的arp表
df = get_arp_ip_mac()

于是我们获取到了当前网段在线的设备列表:

⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️_windows_11

📇 双线程获取指定网段的在线设备🔥

不过使用arp表查看有个缺陷,只能查看当前网段的,跨网段的在线设备似乎看不到。经分析我使用的台式机通过有线连接到3网段,而手机通过WiFi连接到2网段,所以必须能够分析2网段设备的在线设备才有意义。

思路2:用两个线程一个线程专门发回送请求,一个线程专门接收回送响应,可以通过回送响应获取IP地址,于是就可以得到指定网段的当前在线的设备的ip。

先完成获取在线设备列表:

from concurrent.futures import ThreadPoolExecutor
import _thread
import struct
import time
import os
import re
import socket
import pandas as pd


def calc_checksum(src_bytes):
"""用于计算ICMP报文的校验和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2

if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff

total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)


def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包选项数据
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
icmp_socket.sendto(header + data, (target_addr, 1))


def receive_pong(icmp_socket, net_segment, timeout=2):
icmp_socket.settimeout(timeout)
ips = set()
while True:
start_time = time.time()
try:
recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
if ip.startswith(net_segment):
ips.add(ip)
except socket.timeout as e:
break
return ips


def ping_net_segment_all(icmp_socket, net_segment):
for i in range(1, 255):
ip = f"{net_segment}.{i}"
sent_ping(icmp_socket, ip)


icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
with ThreadPoolExecutor() as p:
p.submit(ping_net_segment_all, icmp_socket, "192.168.2")
future = p.submit(receive_pong, icmp_socket, "192.168.2", 3)
ips = future.result()

运行结果,目前我的手机ip为​​192.168.2.122​​,运行后被顺利检测到:

{'192.168.2.1',
'192.168.2.122',
'192.168.2.17',
'192.168.2.18',
'192.168.2.19',
'192.168.2.20',
'192.168.2.21',
'192.168.2.22',
'192.168.2.23',
'192.168.2.49'}

关闭手机WiFi后,再次运行,顺利看到该IP的下线。

📟完成BOSS来了的摸鱼神器🔥

在已经将更新时间缩短到5秒以内时,咱们就可以PING指定网段,最后完成分析设备上下线的功能,从而达到最终的目的完成BOSS来了的摸鱼神器。

from concurrent.futures import ThreadPoolExecutor
import _thread
import struct
import time
import os
import re
import socket
import pandas as pd


def calc_checksum(src_bytes):
"""用于计算ICMP报文的校验和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2

if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff

total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)


def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包选项数据
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
icmp_socket.sendto(header + data, (target_addr, 1))


def receive_pong(icmp_socket, net_segment, timeout=2):
icmp_socket.settimeout(timeout)
ips = set()
while True:
start_time = time.time()
try:
recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
if ip.startswith(net_segment):
ips.add(ip)
except socket.timeout as e:
break
return ips


def ping_net_segment_all(icmp_socket, net_segment):
for i in range(1, 255):
ip = f"{net_segment}.{i}"
sent_ping(icmp_socket, ip)


last = None
while 1:
icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
with ThreadPoolExecutor() as p:
p.submit(ping_net_segment_all, icmp_socket, "192.168.2")
future = p.submit(receive_pong, icmp_socket, "192.168.2")
ips = future.result()
if last is None:
print("当前在线设备:", ips)
if last:
up = ips-last
if up:
print("\r新上线设备:", up, end=" "*100)
down = last-ips
if down:
print("\r刚下线设备:", down, end=" "*100)
last = ips
time.sleep(3)

结果示例:

当前在线设备: {'192.168.2.122', '192.168.2.18', '192.168.2.20', '192.168.2.1', '192.168.2.23', '192.168.2.49', '192.168.2.21', '192.168.2.17', '192.168.2.22', '192.168.2.19'}
刚下线设备: {'192.168.2.122'}

经测试,手工关闭或打开手机WiFi能够顺利看到设备IP的打印信息。这种方法虽然无法获取MAC地址,但是经测试,同一台机器都会被分配同一个IP,在我当前的网络下是满足要求的,只需要知道老板手机连接的IP就行了。或者观察一下,老板走之后,到底哪个IP下线了,专门去监控这个IP。

更安全的做法就是每看到有新的IP上线都额外警惕一点,如果你是win10系统可以使用如下方法实现系统通知:

from win10toast import ToastNotifier

toaster = ToastNotifier()
toaster.show_toast("通知标题", "通知内容!", duration=10)

上述三个参数分别是通知标题,通知的内容和通知持续的时间,对于摸鱼这种事持续时间可以调大掉,再手工关闭通知,通过​​pip install win10toast​​安装。

☀️总结

总算做成了这个摸鱼神器,不过虽然我上面一本正经的讲的津津有味,但不会真有人打算拿这个代码去应用于实际去对付老板吧⁉️不会吧,不会吧⁉️

真打算做摸鱼神器的童鞋,我个人推荐搞个网络摄像头,写个人物图像识别的代码,发现有人进来了都自动提醒,这样才可以更放心的摸鱼。万一老板没连wifi就过来了,这就有点坑。

开发摸鱼神器不是本文本身的目的,学习网络知识自主实现网络协议,从通过实际例子理解网络协议才是本文真正的目的。为了构思本文,我也是苦思冥想了几天几夜了,

举报

相关推荐

0 条评论