0
点赞
收藏
分享

微信扫一扫

【问题】记录spark查询ES,数据重复的问题

泠之屋 2022-09-29 阅读 156

真实环境遇到spark查询ES,出现数据重复的现象。记录一下整个背景和解决问题过程。记录过程比较简单,真实排查过程艰难定位到最终原因

记录spark查询ES,数据重复的问题

1.环境

ES 7.10.0 Spark 2.3.0 scala 2.11 ES极限网关:[GATEWAY] 1.5.0_SNAPSHOT, 2021-10-27 11:22:59, 10e5f97

ES索引:

{
  "dwd_order" : {
    "aliases" : { },
    "mappings" : {
      "dynamic" : "strict",
      "properties" : {
        "create_time" : {
          "type" : "long"
        },
        "custom_uid" : {
          "type" : "keyword"
        }
      }
    },
    "settings" : {
      "index" : {
        "routing" : {
          "allocation" : {
            "include" : {
              "_tier_preference" : "data_content"
            }
          }
        },
        "refresh_interval" : "20s",
        "number_of_shards" : "4",
        "provided_name" : "dwd_order",
        "creation_date" : "1663855284830",
        "number_of_replicas" : "1",
        "uuid" : "eZoOGZsqTXudfjD6dOMqPw"
      }
    }
  }
}

2.大体逻辑:

执行spark sql查询ES。大体逻辑

sparkConf.setAll(nileConfig.sparkConfig)
      .set("nile.framework.config", config)

val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

val df = sparkSession.read
  .format("org.elasticsearch.spark.sql")
  .schema(StructType(sparkField.toArray))
  .options(sourceConfig.config)
.load(s"${sourceNamespace.database}/${sourceNamespace.table}")
              
sparkSession.sql(s"select field_a,field_b,field_c from dwd_order where create_time = 1664177748 AND id =  123")

3.问题现象:

dwd_order索引中只有一条数据符合create_time = 1664177748 AND id = 123条件。

spark查询es根据索引dwd_order的shard数量,产生4个task。结果每个task都查到符号条件“create_time = 1664177748 AND id = 123”的数据。

排查问题过程:

  1. 根据shard个数,固定spark task个数,并添加日志,输出查询结果。 确认每个task输出同样数据
  2. 怀疑es依赖依赖jar 7.10.0版本问题。网上没有找到相关issue
  3. 怀疑es问题,确认es架构。访问es先经过LB -> ES极限网关 -> ES。从后往前分别配置,查看哪个环节有问题。
    • 直接配置ES节点IP:Port,访问数据正常,没有数据重复情况
    • 配置ES代理“极限网关”的ip:port,查询结果有重复现象。锁定产出问题场景

解决

升级极限网关版本。

举报

相关推荐

0 条评论