0
点赞
收藏
分享

微信扫一扫

Python写feign接口工具

查拉图斯特拉你和他 2021-09-30 阅读 79

每次想调用feign接口的时候都要上eureka查一下ip和端口,所以封装了两个请求方法,对输入的url提取服务名并自动查找eureka上的ip端口,方便接口测试,也可以在接口自动化框架中使用

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import json
from urllib.parse import urljoin


class ServiceNotFoundError(Exception):
    """ 在eureka找不到服务时抛出这个异常 """
    pass


class NoInstanceError(Exception):
    """ 服务没有实例,抛出这个异常 """
    pass


# eureka地址
env_test = 'http://{地址}:{端口}'

active = env_test

# 防止建立过多连接,使用session来请求接口
session = requests.Session()
session.keep_alive = False

# 最多重试3次,间隔0.5s
adapter = HTTPAdapter(max_retries=Retry(connect=3, backoff_factor=0.5))
session.mount('http://', adapter)
session.mount('https://', adapter)


def get_service_host(service):
    """
    从eureka获取指定服务的ip和端口,如果有多个节点,只返回第一个
    :param service: 注册到eureka服务名,如 draw-course-backend
    :return: 第一个节点的服务ip和端口,如 http://172.16.51.6:8889/
    """
    headers = {'Accept': 'application/json'}
    res = session.get(urljoin(active, '/eureka/apps/%s' % service), headers=headers)
    if res.status_code != 200:
        raise ServiceNotFoundError('The "%s" service maybe not be registered in eureka!' % service)
    content = str(res.content, encoding='utf-8')
    content = json.loads(content)
    instance = content['application']['instance']
    if not instance:
        raise NoInstanceError('No "%s" service instance available' % service)
    instance = instance[0]
    return instance['homePageUrl']


def translate_url(url):
    """
    把链接的服务名转换成ip地址
    :param url: /draw-course-backend/a/b/c (或 draw-course-backend/a/b/c)
    :return: 新的链接,如 http://172.16.51.6:8889/a/b/c
    """
    routes = str(url).split('/')
    service = routes.pop(0)
    if service == '':
        service = routes.pop(0)
    host = get_service_host(service)
    url = urljoin(host, '/'.join(routes))
    print('translate_url: %s' % url)
    return url


def fget(url, **kwargs):
    """ 对feign接口发起get请求 """
    url = translate_url(url)
    try:
        return session.get(url, timeout=10, **kwargs)
    except Exception as e:
        print(e)


def fpost(url, **kwargs):
    """ 对feign接口发起post请求 """
    url = translate_url(url)
    try:
        return session.post(url, timeout=10, **kwargs)
    except Exception as e:
        print(e)
举报

相关推荐

0 条评论