Locust 使用
Locust 简介:
Locust 是一个开源性能测试工具,使用Python 代码来定义用户行为,用它可以模拟百万计的并发用户访问你的系统,并且支持分布式压测。官网地址 https://www.locust.io/
Locust 安装
- 虚拟环境的安装(虚拟环境是为了实现Python 环境的隔离,解决三方库不同版本之间的依赖冲突问题,保持环境的一致性)
第一种方式是利用pycharm 工具来设置虚拟环境
第二种方式是利用python 3.6 版本以后自带的venv 工具
①在pycharm 终端输入命令 python -m venv locust 其中前面是固定写法 locust 是自定义虚拟环境名字可随意书写,新建完成之后会在当前目录下多出来locust 目录
②需要激活刚刚新建的虚拟环境 cd locust/Script 目录下 ,输入activate 命令激活虚拟环境
激活之后 这里会显示当前虚拟环境的名字,也可以用pip list 来验证当前的环境
③退出虚拟环境的话,终端输入deactivate
- 在虚拟环境中安装Locust
通过pip 命令安装 pip install locust
或者指定国内安装源安装 pip install -i https://pypi.douban.com/simple locust
环境安装完成之后验证环境:locust -V
Locust 脚本书写
一个Locust 脚本可以定义两个类
- 任务类继承TaskSet 编写具体的任务
- 用户类 继承HttpUser 类似一个控制器
任务类需要继承自TaskSet 类,任务类包含处理具体任务的一个函数或者多个函数,函数需要用@task 标识出来。
任务类中固定的on_start() 方式用于每个用户在初始化的时候只执行一次,必须用户的登录在所有的任务只需要执行一次就可以放在on_start() 里面来实现
on_stop() 用户级别的收尾操作,且每个用户只执行一次,可以用于用户任务结束后的一些环境数据清理动作
用户类需要继承自HttpUser类,用户类中常用属性:
- tasks 变量类型为列表,指定运行的任务类
- host 指定运行的服务器的ip 和端口
- wait_time 指定用户每次请求的间隙时间,一般结合between() 函数来使用
from locust import TaskSet, task, HttpUser, between
class MyGetTask(TaskSet):
url = 'http://localhost:8080/user'
def on_start(self):
print("用户初始化")
@task
def get_test(self):
query_param = {'name': 'hhz'}
response = self.client.get(self.url, name='查询用户接口', params=query_param, timeout=10)
print(response.json())
def on_stop(self):
print("用户收尾")
class MyUser(HttpUser):
tasks = [MyGetTask]
host = 'http://localhost:8080'
wait_time = between(2, 2)
脚本的运行
第一种在终端通过命令行 locust -f locusefile.py 来指定运行的文件
程序启动成功之后在浏览器输入localhost:8089
第二种在脚本文件中利用os.system() 方法来执行脚本命令
if name == ‘main’:
os.system(“locust -f locust_get.py”)
Locust 发送http 请求
locust 发送http 请求是基于Python request库,在具体的任务类中通过self.client.get() 或者self.client.post() 发送get 请求或者post 请求
发送get请求
@task
def get_test(self):
query_param = {'name': 'hhz'}
with self.client.get(self.url, name='查询用户接口', params=query_param, timeout=10) as response:
print(response.json())
发送post 请求
@task
def post_test(self):
data = {'username': 'hhz', 'pwd': '1234'}
# catch_response 自定义请求的成功与否
with self.client.post(self.url, name='用户登录接口', json=data, timeout=10, catch_response=True) as response:
res = response.json()
if res['code'] == 200:
response.success()
else:
response.failure(res['msg'])
print(response.json())
请求中参数说明
参数值 | 说明 |
---|---|
url | 对于一些query 参数可以用?param=value 的形式拼接到url中 |
name | 可选参数,定义标签名字,在Locust 的web监控面板里面可以体现出来 |
params | 可选参数,传入字典类型 ,一般用于接口请求中query 参数的传值 |
data | 可选参数,传入字典类型。post 请求中用于表单的提交,使用此参数会在请求头里面默认携带Content-Type=application/x-www-form-urlencoded |
json | 可选参数,传入字典类型,用于post 请求body体中json 数据的发送,使用此参数会在请求头中默认携带Content-Type=application/json |
headers | 可选参数,传入字典类型,请求头信息 |
cookies | 可选参数,传入字典类型,像请求中传入cookie 信息 |
files | 可选参数,传入字典类型({‘文件参数’:open(’’,‘r’)}), 上传文件 |
verify | 可选参数,如果为True 则https 请求会校验证书 |
catch_response | 为True可以自定义请求是否成功,locust 默认根据响应状态码是否为200来判定请求是否成功。实际业务还应根据具体的业务code 来判断请求是否成功。 |
关与response 常用的属性和方法
response.json() | 返回字典类型的响应数据,要求接口的响应数据为json 类型的数据 |
---|---|
response.content | 返回响应内容的二进制数据 |
response.text | 返回字符串 |
Locust参数化
参数化的思想是首先准备一个数据源,数据源可以自定义列表,也可以从数据库或者文件中读取。然后再在需要用的地方根据一定的规则去数据源取数据。
在参数化这个地方可以借鉴python 三方库faker pip install faker ,可以生成随机的姓名、手机号、地址、ip 等等信息 详细使用可以参考 https://www.cnblogs.com/pywen/p/14245369.html
随机参数化
参数值随机从数据源中获取
import os
import random
from faker import Faker
from locust import TaskSet, task, HttpUser, between
class MyPostTask(TaskSet):
url = 'http://localhost:8080/login1'
def on_start(self):
print("用户初始化")
self.phone_len = len(self.user.phone_list)
@task
def get_test(self):
data = {'username': 'hhz', 'pwd': '1234'}
# 产生随机数
index = random.randint(0, self.phone_len-1)
# 通过self.user.变量名 来获取User 类中定义的变量
data['username'] = self.user.phone_list[index]
print(data)
with self.client.post(self.url, name='用户登录接口', json=data, timeout=10, catch_response=True) as response:
res = response.json()
if res['code'] == 200:
response.success()
else:
response.failure(res['msg'])
print(response.json())
def on_stop(self):
print("用户收尾")
class MyUser(HttpUser):
tasks = [MyPostTask]
host = 'http://localhost:8080'
wait_time = between(2, 2)
phone_list = []
faker = Faker(locale='zh_CN')
# 构造手机号数据源
for i in range(1000):
phone_list.append(faker.phone_number()+"_" + str(i))
顺序参数化
参数值按照顺序从数据源中循环获取
import os
from faker import Faker
from locust import TaskSet, task, HttpUser, between
class MyPostTask(TaskSet):
url = 'http://localhost:8080/login1'
def on_start(self):
print("用户初始化")
self.phone_len = len(self.user.phone_list)
self.num = self.phone_len
@task
def get_test(self):
data = {'username': 'hhz', 'pwd': '1234'}
index = self.num % self.phone_len
data['username'] = self.user.phone_list[index]
self.num += 1
print(data)
with self.client.post(self.url, name='用户登录接口', json=data, timeout=10, catch_response=True) as response:
res = response.json()
if res['code'] == 200:
response.success()
else:
response.failure(res['msg'])
print(response.json())
def on_stop(self):
print("用户收尾")
class MyUser(HttpUser):
tasks = [MyPostTask]
host = 'http://localhost:8080'
wait_time = between(0, 0)
phone_list = []
faker = Faker(locale='zh_CN')
for i in range(1000):
phone_list.append(faker.phone_number()+"_" + str(i))
唯一参数化
唯一参数化是指用户传参的时候从数据源中取到的数据唯一,可以借助队列数据结构来实现,利用队列先进先出的特点可以确保从队列中取出的数据唯一,当队列取值为空的时候程序会进入阻塞状态
import os
import queue
from faker import Faker
from locust import TaskSet, task, HttpUser, between
class MyPostTask(TaskSet):
url = 'http://localhost:8080/login1'
def on_start(self):
print("用户初始化")
@task
def get_test(self):
data = {'username': 'hhz', 'pwd': '1234'}
# 从队列中取数据,当取值为空时会进入阻塞状态
phone_number = self.user.queue_data.get()
data['username'] = phone_number
print(data)
with self.client.post(self.url, name='用户登录接口', json=data, timeout=10, catch_response=True) as response:
res = response.json()
if res['code'] == 200:
response.success()
else:
response.failure(res['msg'])
print(response.json())
def on_stop(self):
print("用户收尾")
class MyUser(HttpUser):
tasks = [MyPostTask]
host = 'http://localhost:8080'
wait_time = between(0, 0)
# 定义一个队列
queue_data = queue.Queue()
faker = Faker(locale='zh_CN')
for i in range(100):
# 数据入队
queue_data.put(faker.phone_number()+"_" + str(i))
if __name__ == '__main__':
os.system("locust -f locust_post_param_unique.py")
Locust 多任务
任务比重控制
多任务对应性能测试场景中就是组合接口的压测。可能需要根据实际场景进行流量控比,比如在电商场景中,用户登录之后 做一次浏览首页,两次或者多次查看商品详情。这样的话可以通过@task装饰器来指定权重,来控制任务的执行概率。如果不指定的话,默认为1:1,指定权重后,权重越大的方法,执行的概率就越大
@task(1)
def login_test(self)...
@task(2)
def index_test(self)...
@task(1)
def shop_test(self)...
多任务按指定顺序运行
任务类中的任务按照先后顺序执行,修改任务类继承自 SequentialTaskSetta,按顺序执行后,权重比例就不生效了
class MyPostTask(SequentialTaskSet):
@task
def test_1(self):...
@task
def test_2(self)...
@task
def test_3(self)...
Locust 性能进阶之FastHttpLocust
官方文档中提到,默认情况下Locust 请求使用的是Python 库中requests 库,这个库被广大的python 开发者所熟悉,但是requests 库性能稍微差点,如果要产生更大的压力的话,官方建议使用FastHttpLocust ,基于geventhttpclient 三方请求库,发送请求的性能可以提升5-6倍。参考链接:
https://docs.locust.io/en/stable/increase-performance.html
使用方法:
1.安装三方库 pip3 install -i https://pypi.douban.com/simple geventhttpclient
2.替换脚本中用户类继承之FastHttpUser
class MyUser(FastHttpUser):
tasks = [MyPostTask]
host = 'http://localhost:8080'
wait_time = between(0, 0)
其他注意地方需要参考文档中对FastHttpSession class 的一些说明,接口支持的类型没有requests 库那么丰富,参数类型也有些不支持大体是一样的。在实际压测中,建议优先使用FastHttpLocust
Locust 脚本压测
官方推荐在linux 机器上执行脚本压测,分为有界面和无界面模式两种压测。需要在linux 机器上安装Python 环境 和Locust 环境
Web 界面的方式启动locust 脚本
在命令行通过脚本命令的方式启动 --host 指定请求的域名
locust -f locust_fast.py --host http://ip:port --web-port 8087
在web界面指定用户的并发数来操作locust
非Web 界面的方式启动locust 脚本
相比于web 界面来说,非web 界面主要是通过命令行参数来指定并发的用户数、以及用户每秒请求增加数,可压测时间。另外非web 界面的方式可以结合C/I 工具自动执行,web 界面只能手动执行
参考文档
https://docs.locust.io/en/stable/running-without-web-ui.html
locust -f locust_fast.py --host http://ip:port --web-port 8087 --headless -u 1000 -r 100 --run-time 2m --logfile locust.log --csv result/user
# 相比于web 界面启动的话 需要指定--headless
# -u 并发数
# -r 用户增长速度
# --run-time 指定运行时间
# --logfile 指定locust 运行日志
# --csv 指定运行的结果目录前缀
分布式压测
当单台压力机产生的压力满足不了压测需求时,可以使用多机分布式压测。由于locust 使用的是单进程发压,只能利用单个CPU核的资源,为了更加充分利用CPU,我们可以在一台机器上启多个进程采用主进程+ 从进程的方式进行启动,也可以采用多台机器进行分布式压测
主进程启动命令
locust -f locust_fast.py --host http://ip:port --web-port 8087 --headless -u 100 -r 10 -t 1m --master --expect-workers 2
# --expect-works 指定从进程个数,当从进程启动个数达到时,才会开始压测
从进程启动命令
locust -f locust_fast.py --host http://ip:port --web-port 8087 --worker --master-host 主进程ip
Locust 对其他协议压测
默认情况下,Locust 只能对HTTP/HTTPS 进行压测,如果需要压测其他协议的接口,需要对Locust 进行扩展,比如TCP、WebSocket等
扩展方法:
- 编写自定义协议客户端类,自己封装相关方法,建立连接,发送和接收数据、关闭连接
- 编辑自定义的User 类,将self.client 指向自定义协议客户端类
- 编写任务类,在任务类中直接调用封装好的客户端代码
参考官方文档地址
https://docs.locust.io/en/stable/testing-other-systems.html#example-writing-an-xml-rpc-user-client
TCP 服务端代码
import socket
import threading
def handle_request(ip_address, new_socket):
print("客户端ip和端口为:", ip_address)
while True:
recv_data = new_socket.recv(1024)
if recv_data:
recv_data = recv_data.decode('utf-8')
print(f"客户端发送的数据是:{recv_data}")
print(f"客户端的ip地址是:{ip_address}")
new_socket.send('请求成功'.encode('utf-8'))
else:
print("客户端已经断开连接")
break
new_socket.close()
if __name__ == '__main__':
# 建立服务端套接字对象
tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定端口
tcp_server_socket.bind(("", 9094))
# 设置监听 设置最大等待连接数,并返回新的套接字
tcp_server_socket.listen(128)
while True:
# 等待接收 并返回新的套接字
new_socket, ip_address = tcp_server_socket.accept()
server_thread = threading.Thread(target=handle_request, args=(ip_address, new_socket))
server_thread.start()
#tcp_server_socket.close()
自定义Locust TCP 协议客户端代码
import time
import socket
from locust import User
class TcpClient(object):
def __init__(self, host, port, request_event):
self.host = host
self.port = port
self._request_event = request_event
# 建立连接
def connect(self):
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((self.host, self.port))
# 发送和接收数据
def send_recv(self, data:str):
request_meta = {
"request_type": "xmlrpc",
"name": 'TCP请求',
"start_time": time.time(),
"response_length": 0, # calculating this for an xmlrpc.client response would be too hard
"response": None,
"context": {}, # see HttpUser if you actually want to implement contexts
"exception": None,
}
start_perf_counter = time.perf_counter()
try:
self.s.send(data.encode('utf-8'))
recv_data = self.s.recv(1024)
print('接收到的响应数据是:' + recv_data.decode('utf-8'))
request_meta["response"] = recv_data.decode('utf-8')
request_meta['response_length'] = len(recv_data.decode('utf-8'))
except Exception as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
self._request_event.fire(**request_meta)
print('请求成功')
return request_meta["response"]
# 关闭连接
def close(self):
self.s.close()
print('关闭连接成功')
class TCPUser(User):
"""
A minimal Locust user class that provides an XmlRpcClient to its subclasses
"""
abstract = True # dont instantiate this as an actual user when running Locust
def __init__(self, environment):
super().__init__(environment)
self.client = TcpClient(self.host, self.port, request_event=environment.events.request)
任务类测试
import os
from locust import TaskSet, task, between
from locust_tcp import TCPUser
class MyTcpTask(TaskSet):
def on_start(self):
self.client.connect()
@task
def test_tcp(self):
my_data = 'hello testTcp'
self.client.send_recv(my_data)
def on_stop(self):
self.client.close()
# 继承自定义用户类
class MyUser(TCPUser):
tasks = [MyTcpTask]
host = '127.0.0.1'
port = 9094
wait_time = between(2, 2)
if __name__ == '__main__':
os.system('locust -f tcp_test.py')
服务端性能资源监控 Prometheus + node_exporter+grafana
采用docker 的方式部署监控工具,需要先在环境上docker 工具
- 拉取镜像
docker pull prom/prometheus
docker pull grafana/grafana
- 创建容器网络环境
docker create network grafana
- 启动grafana 和 promethues 容器
docker run -d --name=grafana --network grafana -p 3000:3000 grafana/grafana
docker run -d --name=prometheus --network grafana -p 9090:9090 -v ${PWD}/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus --config.file=/etc/prometheus/prometheus.yml
- 安装node_exporter 下载地址
https://github.com/prometheus/node_exporter/releases/tag/v1.2.2
# 下载完成之后上传服务器解压执行
nohup ./node_exporter &
- 在当前目录下新建prometheus.yml修改prometheus.yml 配置并重启prometheus容器
# my global config
global:
scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
# Alertmanager configuration
alerting:
alertmanagers:
- static_configs:
- targets:
# - alertmanager:9093
# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first_rules.yml"
# - "second_rules.yml"
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: "prometheus"
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ["localhost:9090"]
global: 整体参数配置
alerting: 告警通知配置(需要配合alertmanager的服务进行使用。)
rule_files: 规则文件配置
scrape_configs: 监控节点配置。(当然节点配置有很多中类型)
容器重启完成后,可以在浏览器中输入http:ip:9090 ,界面中target 的配置与prometheus.yml 配置一致
- Grafana 中配置 promethues 数据源和 node_exporter dashboard
在浏览器中输入http:ip:3000 默认用户名和密码都是admin,登录之后添加数据源,选择数据源为prometheus
配置数据源相关信息,配置完成之后点击下方save and test ,配置正确的话会显示Data source is working
导入dashboard
输入dashboard地址之后点击load :https://grafana.com/grafana/dashboards/8919
再选择刚刚创建的prometheus 数据源,然后就可以看到我们所监控服务器的一些性能资源