首先安装grpc、日志、locust相关依赖库:
google
protobuf
grpcio
grpcio-tools
grpc_interceptor
loguru
locust==2.15.1
然后创建一个grpc_user.py文件,内容如下:
import time
from typing import Any, Callable
import grpc
import grpc.experimental.gevent as grpc_gevent
from grpc_interceptor import ClientInterceptor
from locust import User
from locust.exception import LocustError
from loguru import logger
# patch grpc so that it uses gevent instead of asyncio
grpc_gevent.init_gevent()
class LocustInterceptor(ClientInterceptor):
def __init__(self, environment, *args, **kwargs):
super().__init__(*args, **kwargs)
self.env = environment
def intercept(
self,
method: Callable,
request_or_iterator: Any,
call_details: grpc.ClientCallDetails,
):
response = None
exception = None
start_perf_counter = time.perf_counter()
response_length = 0
try:
response = method(request_or_iterator, call_details)
response_result = response.result()
if response_result.code != 0:
exception = Exception("code is not 0")
else:
# logger.debug(response_result)
logger.debug("success")
response_length = response.result().ByteSize()
except grpc.RpcError as e:
exception = e
self.env.events.request.fire(
request_type="grpc",
name=call_details.method,
response_time=(time.perf_counter() - start_perf_counter) * 1000,
response_length=response_length,
response=response,
context=None,
exception=exception,
)
return response
class GrpcUser(User):
abstract = True
stub_class = None
def __init__(self, environment):
super().__init__(environment)
for attr_value, attr_name in (
(self.host, "host"),
(self.stub_class, "stub_class"),
):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")
self._channel = grpc.insecure_channel(self.host)
interceptor = LocustInterceptor(environment=environment)
self._channel = grpc.intercept_channel(self._channel, interceptor)
self.stub = self.stub_class(self._channel)
最后写压测文件(引用grpc_user.py),这里文件名假设为xxx.py。内容如下:
from locust import task
import xx_pb2, xx_pb2_grpc
import grpc_user
class MyGrpcUser(grpc_user.GrpcUser):
host = "ip:port"
stub_class = xx_pb2_grpc.XxxStub
@task
def get_charging_gun_count(self):
req = xx_pb2.Req()
self.stub.XxxMethod(req)
之后就可以运行压测了:
locust -f xxx.py