1 介绍
Influx Proxy 是一个基于高可用、一致性哈希的 InfluxDB 集群代理服务,实现了 InfluxDB 高可用集群的部署方案, 具有动态扩/缩容、故障恢复、数据同步等能力。连接到 Influx Proxy 和连接原生的 InfluxDB Server 没有显著区别 (支持的查询语句列表),对上层客户端是透明的,上层应用可以像使用单机的 InfluxDB 一样使用,Influx Proxy 会处理请求的转发,并对各个 InfluxDB 集群节点进行管理。Influx Proxy 基于饿了么开源的 Influx-Proxy, 并进一步开发和优化,支持了更多的特性,移除了 Python、Redis 依赖,解决了受限于一个数据库、需要额外配置 KEYMAPS 、数据负载不均衡的问题。
开源地址:
https://github.com/chengshiwen/influx-proxy/tree/influxdb-v2
项目支持influxdb-v1和influxdb-v2
1.1 Influx Proxy架构
- client:influxdb-java、influxdb-shell (influx)、curl、浏览器等客户端
- load balance:负载均衡,如· F5、Nginx、LVS、HAProxy 等
- influx-proxy:influx-proxy 实例,架构示意图部署了两个 influx-proxy 实例
- circle:一致性哈希环(circle),一个 circle 包含了若干个 influxdb 实例,共同存储了一份全量的数据,即每个 circle 都是全量数据的一个副本,各个 circle 数据互备。不同 circle 不能包含相同 influxdb 实例,每个 circle 包含的 influxdb 实例个数可以不相等。circle 只是一种逻辑划分,无实体存在,架构示意图配置了三个 circle
- influxdb:influxdb 实例,以 url 进行区分,可以部署在同一服务器上以不同端口运行多个实例,一个 influxdb 实例只存储了一份全量数据的一部分数据
1.2 设计原理
1.2.1 一致性哈希原理
原理文章:(https://leehao.me/%E4%B8%80%E8%87%B4%E6%80%A7Hash-Consistent-Hashing-%E5%8E%9F%E7%90%86%E5%89%96%E6%9E%90/)
- 一致性哈希算法解决了分布式环境下机器扩缩容时,简单的取模运算导致数据需要大量迁移的问题
- 一致性哈希算法能达到较少的机器数据迁移成本,实现快速扩缩容
- 通过虚拟节点的使用,一致性哈希算法可以均匀分担机器的数据负载
1.2.2 一致性哈希 circle 设计
- 一个 circle 是一个逻辑上的一致性哈希环,包含少数的物理节点和更多数的虚拟节点
- 一个 circle 中的所有 influxdb 实例对应了这个一致性哈希环的物理节点
1.2.3 数据存储位置
每个 circle 维护了一份全量数据,一个 influxdb 实例上的数据只是从属 circle 数据的一部分
每个 circle 数据存储位置计算:
shard_key(db,measurement) + hash_key(idx) + influxdb实例列表 + 一致性哈希算法 => influxdb实例
当 influxdb实例列表、hash_key、shard_key 不发生改变时,db,measurement 将只会唯一对应一台 influxdb实例
当 influxdb实例列表、hash_key、shard_key 发生改变时,需要对少量机器数据进行迁移,即 重新平衡 (rebalance)
1.3 请求流程
1.3.1 写请求
- client 请求 load balance 地址
- load balance 根据负载均衡算法选择一个 influx-proxy 转发请求
- influx-proxy 收到请求,根据请求中 db 和 measurement 信息,每个 circle 使用一致性哈希算法计算出一个 influxdb 实例,并将请求转发给这些 influxdb 实例
- influxdb 实例处理请求,写入数据
- 若存在 influxdb 实例宕掉,或者网络、存储故障导致无法 influxdb 无法写入,则 influx-proxy 会将数据写入到缓存文件中,并直到 influxdb 实例恢复后重新写入
1.3.2 读请求
- client 请求 load balance 地址
- load balance 根据负载均衡算法选择一个 influx-proxy 转发请求
- 若请求中带有 db 和 measurement 信息,使用一致性哈希算法计算出所有 circle 对应的 influxdb 实例,并随机选择一个健康的 influxdb 实例,将请求转发给这个实例
- 若请求中只带有 db 信息,则判断为数据库相关的集群查询语句,并将请求转发给该 circle 的所有 influxdb 实例
- influxdb 实例处理请求,读出数据,返回给 influx-proxy
- 若是单个实例返回数据,则直接返回 client;若是多个实例返回数据,则合并后返回 client
2 安装部署
2.1 下载配置文件
https://github.com/chengshiwen/influx-proxy/tree/influxdb-v2/docker/quick
将docker-compose.yml、proxy.json、setup.sh下载到本地
wget https://raw.githubusercontent.com/chengshiwen/influx-proxy/refs/heads/influxdb-v2/docker/quick/docker-compose.yml
wget https://raw.githubusercontent.com/chengshiwen/influx-proxy/refs/heads/influxdb-v2/docker/quick/proxy.json
wget https://raw.githubusercontent.com/chengshiwen/influx-proxy/refs/heads/influxdb-v2/docker/quick/setup.sh
2.2 下载镜像
docker pull chengshiwen/influx-proxy:3.0.0-preview
docker pull influxdb:2.7
2.3 启动服务
docker-compose up -d --scale influx-proxy=0
bash setup.sh
docker-compose up -d
2.4 当前架构
该架构相当简单,一个InfluxDB代理实例和两个一致的哈希环,分别有两个InfluxDB实例。代理应该将带有组织、bucket和度量的HTTP请求指向两个圆圈和四个InfluxDB服务器。
设置如下所示:
2.5 Write
/api/v2/write v2 supported
2.6 Query
/api/v2/query v2 supported
3 数据测试
3.1 下载脚本
wget https://raw.githubusercontent.com/chengshiwen/influx-proxy/refs/heads/influxdb-v2/script/write.sh
wget https://raw.githubusercontent.com/chengshiwen/influx-proxy/refs/heads/influxdb-v2/script/query.sh
3.2 执行写入脚本
3.3 执行查询脚本
4 配置prometheus使用Influx Proxy
1、配置telegraf
vim /etc/telegraf/telegraf.conf
#配置HTTP侦听器的端口及其侦听路径
[[inputs.http_listener_v2]]
# ## Address and port to host HTTP listener on
service_address = ":12345"
# ## Paths to listen to.
paths = ["/receive"]
# ## Data format to consume.
data_format = "prometheusremotewrite"
#添加InfluxDB输出插件
[[outputs.influxdb_v2]]
urls = ["http://192.168.137.128:7076"]
# token = token没有,给注释掉了
organization = "myorg"
bucket = "mybucket"
2、配置prometheus
remote_write:
- url: "http://192.168.137.128:12345/receive"
3、重新加载prometheus
curl -X POST http://192.168.137.131:9090/-/reload
数据写入的时候一会功夫4G内存就被占满,当把telegraf停掉,内存又降下来了
4、数据查看
curl -X POST 'http://127.0.0.1:7076/api/v2/query?org=myorg' -H 'Content-type: application/json' -d '{"query": "from(bucket:\"mybucket\") |> range(start:0) |> filter(fn: (r) => r._measurement == \"prometheus_remote_write\" and r._field == \"znode_count\")"}'