0
点赞
收藏
分享

微信扫一扫

locust 使用

静悠 2022-04-18 阅读 52
python

Locust 使用

Locust 简介:

Locust 是一个开源性能测试工具,使用Python 代码来定义用户行为,用它可以模拟百万计的并发用户访问你的系统,并且支持分布式压测。官网地址 https://www.locust.io/

Locust 安装

  1. 虚拟环境的安装(虚拟环境是为了实现Python 环境的隔离,解决三方库不同版本之间的依赖冲突问题,保持环境的一致性)
    第一种方式是利用pycharm 工具来设置虚拟环境
    在这里插入图片描述
    第二种方式是利用python 3.6 版本以后自带的venv 工具
    ①在pycharm 终端输入命令 python -m venv locust 其中前面是固定写法 locust 是自定义虚拟环境名字可随意书写,新建完成之后会在当前目录下多出来locust 目录
    在这里插入图片描述
    ②需要激活刚刚新建的虚拟环境 cd locust/Script 目录下 ,输入activate 命令激活虚拟环境
    在这里插入图片描述

激活之后 这里会显示当前虚拟环境的名字,也可以用pip list 来验证当前的环境
③退出虚拟环境的话,终端输入deactivate

  1. 在虚拟环境中安装Locust
    通过pip 命令安装 pip install locust
    或者指定国内安装源安装 pip install -i https://pypi.douban.com/simple locust
    环境安装完成之后验证环境:locust -V

Locust 脚本书写

一个Locust 脚本可以定义两个类

  1. 任务类继承TaskSet 编写具体的任务
  2. 用户类 继承HttpUser 类似一个控制器
    任务类需要继承自TaskSet 类,任务类包含处理具体任务的一个函数或者多个函数,函数需要用@task 标识出来。
    任务类中固定的on_start() 方式用于每个用户在初始化的时候只执行一次,必须用户的登录在所有的任务只需要执行一次就可以放在on_start() 里面来实现
    on_stop() 用户级别的收尾操作,且每个用户只执行一次,可以用于用户任务结束后的一些环境数据清理动作
    用户类需要继承自HttpUser类,用户类中常用属性:
  • tasks 变量类型为列表,指定运行的任务类
  • host 指定运行的服务器的ip 和端口
  • wait_time 指定用户每次请求的间隙时间,一般结合between() 函数来使用
from locust import TaskSet, task, HttpUser, between

class MyGetTask(TaskSet):
    url = 'http://localhost:8080/user'

    def on_start(self):
        print("用户初始化")

    @task
    def get_test(self):
        query_param = {'name': 'hhz'}
        response = self.client.get(self.url, name='查询用户接口', params=query_param, timeout=10)
        print(response.json())

    def on_stop(self):
        print("用户收尾")


class MyUser(HttpUser):
    tasks = [MyGetTask]

    host = 'http://localhost:8080'
    wait_time = between(2,  2)

脚本的运行
第一种在终端通过命令行 locust -f locusefile.py 来指定运行的文件
程序启动成功之后在浏览器输入localhost:8089
在这里插入图片描述

第二种在脚本文件中利用os.system() 方法来执行脚本命令
if name == ‘main’:
os.system(“locust -f locust_get.py”)

Locust 发送http 请求

locust 发送http 请求是基于Python request库,在具体的任务类中通过self.client.get() 或者self.client.post() 发送get 请求或者post 请求

发送get请求

@task
def get_test(self):
  query_param = {'name': 'hhz'}
  with self.client.get(self.url, name='查询用户接口', params=query_param, timeout=10) as response:
      print(response.json())

发送post 请求

@task
def post_test(self):
    data = {'username': 'hhz', 'pwd': '1234'}

    # catch_response 自定义请求的成功与否
    with self.client.post(self.url, name='用户登录接口', json=data, timeout=10, catch_response=True) as response:
        res = response.json()
        if res['code'] == 200:
            response.success()
        else:
            response.failure(res['msg'])
        print(response.json())

请求中参数说明

参数值说明
url对于一些query 参数可以用?param=value 的形式拼接到url中
name可选参数,定义标签名字,在Locust 的web监控面板里面可以体现出来
params可选参数,传入字典类型 ,一般用于接口请求中query 参数的传值
data可选参数,传入字典类型。post 请求中用于表单的提交,使用此参数会在请求头里面默认携带Content-Type=application/x-www-form-urlencoded
json可选参数,传入字典类型,用于post 请求body体中json 数据的发送,使用此参数会在请求头中默认携带Content-Type=application/json
headers可选参数,传入字典类型,请求头信息
cookies可选参数,传入字典类型,像请求中传入cookie 信息
files可选参数,传入字典类型({‘文件参数’:open(’’,‘r’)}), 上传文件
verify可选参数,如果为True 则https 请求会校验证书
catch_response为True可以自定义请求是否成功,locust 默认根据响应状态码是否为200来判定请求是否成功。实际业务还应根据具体的业务code 来判断请求是否成功。

关与response 常用的属性和方法

response.json()返回字典类型的响应数据,要求接口的响应数据为json 类型的数据
response.content返回响应内容的二进制数据
response.text返回字符串

Locust参数化

参数化的思想是首先准备一个数据源,数据源可以自定义列表,也可以从数据库或者文件中读取。然后再在需要用的地方根据一定的规则去数据源取数据。
在参数化这个地方可以借鉴python 三方库faker pip install faker ,可以生成随机的姓名、手机号、地址、ip 等等信息 详细使用可以参考 https://www.cnblogs.com/pywen/p/14245369.html

随机参数化

参数值随机从数据源中获取

import os
import random
from faker import Faker
from locust import TaskSet, task, HttpUser, between


class MyPostTask(TaskSet):
    url = 'http://localhost:8080/login1'

    def on_start(self):
        print("用户初始化")
        self.phone_len = len(self.user.phone_list)

    @task
    def get_test(self):
        data = {'username': 'hhz', 'pwd': '1234'}

        # 产生随机数
        index = random.randint(0, self.phone_len-1)
        # 通过self.user.变量名  来获取User 类中定义的变量
        data['username'] = self.user.phone_list[index]
        print(data)
        with self.client.post(self.url, name='用户登录接口', json=data, timeout=10, catch_response=True) as response:
            res = response.json()
            if res['code'] == 200:
                response.success()
            else:
                response.failure(res['msg'])
            print(response.json())

    def on_stop(self):
        print("用户收尾")


class MyUser(HttpUser):
    tasks = [MyPostTask]
    host = 'http://localhost:8080'
    wait_time = between(2,  2)

    phone_list = []
    faker = Faker(locale='zh_CN')
    # 构造手机号数据源
    for i in range(1000):
        phone_list.append(faker.phone_number()+"_" + str(i))

顺序参数化

参数值按照顺序从数据源中循环获取

import os
from faker import Faker
from locust import TaskSet, task, HttpUser, between


class MyPostTask(TaskSet):
    url = 'http://localhost:8080/login1'

    def on_start(self):
        print("用户初始化")
        self.phone_len = len(self.user.phone_list)
        self.num = self.phone_len

    @task
    def get_test(self):
        data = {'username': 'hhz', 'pwd': '1234'}

        index = self.num % self.phone_len
        data['username'] = self.user.phone_list[index]
        self.num += 1
        print(data)
        with self.client.post(self.url, name='用户登录接口', json=data, timeout=10, catch_response=True) as response:
            res = response.json()
            if res['code'] == 200:
                response.success()
            else:
                response.failure(res['msg'])
            print(response.json())

    def on_stop(self):
        print("用户收尾")


class MyUser(HttpUser):
    tasks = [MyPostTask]
    host = 'http://localhost:8080'
    wait_time = between(0,  0)

    phone_list = []
    faker = Faker(locale='zh_CN')
    for i in range(1000):
        phone_list.append(faker.phone_number()+"_" + str(i))

唯一参数化

唯一参数化是指用户传参的时候从数据源中取到的数据唯一,可以借助队列数据结构来实现,利用队列先进先出的特点可以确保从队列中取出的数据唯一,当队列取值为空的时候程序会进入阻塞状态

import os
import queue
from faker import Faker
from locust import TaskSet, task, HttpUser, between


class MyPostTask(TaskSet):
    url = 'http://localhost:8080/login1'

    def on_start(self):
        print("用户初始化")


    @task
    def get_test(self):
        data = {'username': 'hhz', 'pwd': '1234'}

        # 从队列中取数据,当取值为空时会进入阻塞状态
        phone_number = self.user.queue_data.get()
        data['username'] = phone_number
        print(data)
        with self.client.post(self.url, name='用户登录接口', json=data, timeout=10, catch_response=True) as response:
            res = response.json()
            if res['code'] == 200:
                response.success()
            else:
                response.failure(res['msg'])
            print(response.json())

    def on_stop(self):
        print("用户收尾")


class MyUser(HttpUser):
    tasks = [MyPostTask]
    host = 'http://localhost:8080'
    wait_time = between(0,  0)
    # 定义一个队列
    queue_data = queue.Queue()
    faker = Faker(locale='zh_CN')
    for i in range(100):
        # 数据入队
        queue_data.put(faker.phone_number()+"_" + str(i))


if __name__ == '__main__':
    os.system("locust -f locust_post_param_unique.py")

Locust 多任务

任务比重控制

多任务对应性能测试场景中就是组合接口的压测。可能需要根据实际场景进行流量控比,比如在电商场景中,用户登录之后 做一次浏览首页,两次或者多次查看商品详情。这样的话可以通过@task装饰器来指定权重,来控制任务的执行概率。如果不指定的话,默认为1:1,指定权重后,权重越大的方法,执行的概率就越大

@task(1)
def login_test(self)...

@task(2)
def index_test(self)...

@task(1)
def shop_test(self)...

多任务按指定顺序运行

任务类中的任务按照先后顺序执行,修改任务类继承自 SequentialTaskSetta,按顺序执行后,权重比例就不生效了

class MyPostTask(SequentialTaskSet):

    @task
    def test_1(self):...
    
    @task
    def test_2(self)...
    
    @task
    def test_3(self)...

Locust 性能进阶之FastHttpLocust

官方文档中提到,默认情况下Locust 请求使用的是Python 库中requests 库,这个库被广大的python 开发者所熟悉,但是requests 库性能稍微差点,如果要产生更大的压力的话,官方建议使用FastHttpLocust ,基于geventhttpclient 三方请求库,发送请求的性能可以提升5-6倍。参考链接:
https://docs.locust.io/en/stable/increase-performance.html
使用方法:
1.安装三方库 pip3 install -i https://pypi.douban.com/simple geventhttpclient
2.替换脚本中用户类继承之FastHttpUser

class MyUser(FastHttpUser):
    tasks = [MyPostTask]
    host = 'http://localhost:8080'
    wait_time = between(0,  0)

其他注意地方需要参考文档中对FastHttpSession class 的一些说明,接口支持的类型没有requests 库那么丰富,参数类型也有些不支持大体是一样的。在实际压测中,建议优先使用FastHttpLocust

Locust 脚本压测

官方推荐在linux 机器上执行脚本压测,分为有界面和无界面模式两种压测。需要在linux 机器上安装Python 环境 和Locust 环境

Web 界面的方式启动locust 脚本

在命令行通过脚本命令的方式启动 --host 指定请求的域名
locust -f locust_fast.py --host http://ip:port --web-port 8087
在web界面指定用户的并发数来操作locust

非Web 界面的方式启动locust 脚本

相比于web 界面来说,非web 界面主要是通过命令行参数来指定并发的用户数、以及用户每秒请求增加数,可压测时间。另外非web 界面的方式可以结合C/I 工具自动执行,web 界面只能手动执行
参考文档
https://docs.locust.io/en/stable/running-without-web-ui.html

locust -f locust_fast.py --host http://ip:port --web-port 8087 --headless -u 1000 -r 100 --run-time 2m --logfile locust.log --csv result/user

# 相比于web 界面启动的话 需要指定--headless 
# -u 并发数 
# -r 用户增长速度 
# --run-time 指定运行时间 
# --logfile 指定locust 运行日志
# --csv 指定运行的结果目录前缀

分布式压测

当单台压力机产生的压力满足不了压测需求时,可以使用多机分布式压测。由于locust 使用的是单进程发压,只能利用单个CPU核的资源,为了更加充分利用CPU,我们可以在一台机器上启多个进程采用主进程+ 从进程的方式进行启动,也可以采用多台机器进行分布式压测
主进程启动命令

locust -f locust_fast.py --host http://ip:port --web-port 8087 --headless -u 100 -r 10 -t 1m --master --expect-workers 2

# --expect-works 指定从进程个数,当从进程启动个数达到时,才会开始压测

从进程启动命令

locust -f locust_fast.py --host http://ip:port --web-port 8087 --worker --master-host 主进程ip

Locust 对其他协议压测

默认情况下,Locust 只能对HTTP/HTTPS 进行压测,如果需要压测其他协议的接口,需要对Locust 进行扩展,比如TCP、WebSocket等
扩展方法:

  1. 编写自定义协议客户端类,自己封装相关方法,建立连接,发送和接收数据、关闭连接
  2. 编辑自定义的User 类,将self.client 指向自定义协议客户端类
  3. 编写任务类,在任务类中直接调用封装好的客户端代码
    参考官方文档地址
    https://docs.locust.io/en/stable/testing-other-systems.html#example-writing-an-xml-rpc-user-client
    TCP 服务端代码
import socket
import threading


def handle_request(ip_address, new_socket):
    print("客户端ip和端口为:", ip_address)

    while True:
        recv_data = new_socket.recv(1024)
        if recv_data:
            recv_data = recv_data.decode('utf-8')
            print(f"客户端发送的数据是:{recv_data}")
            print(f"客户端的ip地址是:{ip_address}")
            new_socket.send('请求成功'.encode('utf-8'))
        else:
            print("客户端已经断开连接")
            break

    new_socket.close()


if __name__ == '__main__':
    # 建立服务端套接字对象
    tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 绑定端口
    tcp_server_socket.bind(("", 9094))

    # 设置监听 设置最大等待连接数,并返回新的套接字
    tcp_server_socket.listen(128)

    while True:
        # 等待接收 并返回新的套接字
        new_socket, ip_address = tcp_server_socket.accept()
        server_thread = threading.Thread(target=handle_request, args=(ip_address, new_socket))

        server_thread.start()

    #tcp_server_socket.close()

自定义Locust TCP 协议客户端代码

import time
import socket

from locust import User


class TcpClient(object):

    def __init__(self, host, port, request_event):
        self.host = host
        self.port = port
        self._request_event = request_event

    # 建立连接
    def connect(self):
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.s.connect((self.host, self.port))

    # 发送和接收数据
    def send_recv(self, data:str):
        request_meta = {
            "request_type": "xmlrpc",
            "name": 'TCP请求',
            "start_time": time.time(),
            "response_length": 0,  # calculating this for an xmlrpc.client response would be too hard
            "response": None,
            "context": {},  # see HttpUser if you actually want to implement contexts
            "exception": None,
        }
        start_perf_counter = time.perf_counter()
        try:
            self.s.send(data.encode('utf-8'))
            recv_data = self.s.recv(1024)
            print('接收到的响应数据是:' + recv_data.decode('utf-8'))
            request_meta["response"] = recv_data.decode('utf-8')
            request_meta['response_length'] = len(recv_data.decode('utf-8'))
        except Exception as e:
            request_meta["exception"] = e
        request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
        self._request_event.fire(**request_meta)
        print('请求成功')
        return request_meta["response"]

    # 关闭连接
    def close(self):
        self.s.close()
        print('关闭连接成功')


class TCPUser(User):
    """
    A minimal Locust user class that provides an XmlRpcClient to its subclasses
    """

    abstract = True  # dont instantiate this as an actual user when running Locust

    def __init__(self, environment):
        super().__init__(environment)
        self.client = TcpClient(self.host, self.port, request_event=environment.events.request)

任务类测试

import os

from locust import TaskSet, task, between

from locust_tcp import TCPUser


class MyTcpTask(TaskSet):

    def on_start(self):
        self.client.connect()

    @task
    def test_tcp(self):
        my_data = 'hello testTcp'
        self.client.send_recv(my_data)

    def on_stop(self):
        self.client.close()

# 继承自定义用户类
class MyUser(TCPUser):
    tasks = [MyTcpTask]
    host = '127.0.0.1'
    port = 9094
    wait_time = between(2, 2)


if __name__ == '__main__':
    os.system('locust -f tcp_test.py')

服务端性能资源监控 Prometheus + node_exporter+grafana

采用docker 的方式部署监控工具,需要先在环境上docker 工具

  1. 拉取镜像
docker pull prom/prometheus 

docker pull grafana/grafana
  1. 创建容器网络环境
docker create network grafana
  1. 启动grafana 和 promethues 容器
 docker run -d --name=grafana --network grafana -p 3000:3000 grafana/grafana 
 
 docker run -d --name=prometheus --network grafana -p 9090:9090 -v ${PWD}/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus --config.file=/etc/prometheus/prometheus.yml
  1. 安装node_exporter 下载地址
    https://github.com/prometheus/node_exporter/releases/tag/v1.2.2
# 下载完成之后上传服务器解压执行
nohup ./node_exporter &
  1. 在当前目录下新建prometheus.yml修改prometheus.yml 配置并重启prometheus容器
# my global config
global:
  scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
    - static_configs:
        - targets:
          # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: "prometheus"

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
      - targets: ["localhost:9090"]

global: 整体参数配置
alerting: 告警通知配置(需要配合alertmanager的服务进行使用。)
rule_files: 规则文件配置
scrape_configs: 监控节点配置。(当然节点配置有很多中类型)

容器重启完成后,可以在浏览器中输入http:ip:9090 ,界面中target 的配置与prometheus.yml 配置一致
在这里插入图片描述

  1. Grafana 中配置 promethues 数据源和 node_exporter dashboard
    在浏览器中输入http:ip:3000 默认用户名和密码都是admin,登录之后添加数据源,选择数据源为prometheus
    在这里插入图片描述

配置数据源相关信息,配置完成之后点击下方save and test ,配置正确的话会显示Data source is working
在这里插入图片描述

导入dashboard
在这里插入图片描述

输入dashboard地址之后点击load :https://grafana.com/grafana/dashboards/8919

再选择刚刚创建的prometheus 数据源,然后就可以看到我们所监控服务器的一些性能资源
在这里插入图片描述

举报

相关推荐

0 条评论