0
点赞
收藏
分享

微信扫一扫

Hive数仓ETL过程

栖桐 2022-05-06 阅读 75

目录

Hive数仓ETL过程

  1. Synchronize industrial chain data to HDFS
hadoop distcp hdfs://10.xx.xx.x:8020/aidata/industrial_chain/ hdfs://10.xx.xx.xx:8020/data/aidata/industrial_chain/
  1. Creating tables in ODS
CREATE TABLE IF NOT EXISTS ods_b10000001_00003.ods_p_industry_chain_company(
  `content` string)
PARTITIONED BY (dt string, ic string)
row format delimited fields terminated by '\001'
  1. Load data into ODS
LOAD DATA INPATH '/data/aidata/industrial_chain/{yyyyMMdd}/{ic}' 
OVERWRITE INTO TABLE ods_b10000001_00003.ods_p_industry_chain_company PARTITION(dt='{yyyyMMdd}',ic='{ic}');
  1. Processing ODS data into DWD
CREATE TABLE IF NOT EXISTS dwd_b10000001_00003.dwd_p_industry_chain_company (
company_id string comment "企业唯一标识",
industry_code string comment "产业编码",
industry_score string comment "产业评分"
) COMMENT "产业链" partitioned by (ic string)
row format delimited fields terminated by '\001';

INSERT OVERWRITE TABLE dwd_b10000001_00003.dwd_p_industry_chain_company (partition ic)
SELECT
GET_JSON_OBJECT(a.content, '$.source.company_id') as company_id,  
REPLACE(GET_JSON_OBJECT(a.content, '$.fields.nodeCode[0]'),'-','') AS industry_code,
GET_JSON_OBJECT(a.content, '$.score') as industry_score,
a.ic
FROM ods_b10000001_00003.ods_p_industry_chain_company a 
WHERE a.dt='{yyyyMMdd}'
  1. Shell script
#!/bin/bash

anynowtime="date +'%Y-%m-%d %H:%M:%S'"
NOW="echo [\`$anynowtime\`][PID:$$]"
if [ $1 ]; then
  start_dt="$1"
else
  start_dt=$(date +"%Y%m%d" -d "-1day")
fi
if [ $2 ]; then
  end_dt="$2"
else
  end_dt="$1"
fi

##### 可在脚本开始运行时调用,打印当时的时间戳及PID。
function job_start() {
  echo "$(eval $NOW) job_start"
}

##### 可在脚本执行成功的逻辑分支处调用,打印当时的时间戳及PID。
function job_success() {
  MSG="$*"
  echo "$(eval $NOW) job_success:[$MSG]"
  exit 0
}

##### 可在脚本执行失败的逻辑分支处调用,打印当时的时间戳及PID。
function job_fail() {
  MSG="$*"
  echo "$(eval $NOW) job_fail:[$MSG]"
  exit 1
}

job_start
###### 作业平台中执行脚本成功和失败的标准只取决于脚本最后一条执行语句的返回值
###### 如果返回值为0,则认为此脚本执行成功,如果非0,则认为脚本执行失败
###### 可在此处开始编写您的脚本逻辑代码
function job() {
  for ic in `hadoop fs -ls -C hdfs://10.xx.xx.x:8020/aidata/industrial_chain/$1/output`; do
    echo "当前日期:"$1 "当前分区:"${ic##*/} "aidata路径:"$ic
    hadoop distcp $ic hdfs://10.32.49.19:8020/data/aidata/industrial_chain/$1/${ic##*/}
    sql=$(
    cat <<-EOF
        set hive.execution.engine=mr;

        CREATE TABLE IF NOT EXISTS ods_b10000001_00003.ods_p_industry_chain_company(
        content string) PARTITIONED BY (dt string, ic string)
        row format delimited fields terminated by '\001';

        LOAD DATA INPATH '/data/aidata/industrial_chain/$1/${ic##*/}' 
        OVERWRITE INTO TABLE ods_b10000001_00003.ods_p_industry_chain_company PARTITION(dt='$1',ic='${ic##*/}');        
EOF
    )
    hive -e "$sql"
    if [ $? == 0 ]; then
      echo "sucess ==>> $1 $ic"
    else
      echo "$sql"
      echo "error ==>> $1 $ic"
      exit 1
    fi
  done
  dwd_sql=$(
      cat <<-EOF
        set hive.execution.engine=mr;
        set hive.exec.dynamic.partition.mode=nonstrict;

        CREATE TABLE IF NOT EXISTS dwd_b10000001_00003.dwd_p_industry_chain_company (
        company_id string comment "企业唯一标识",
        industry_code string comment "产业编码",
        industry_score string comment "产业评分"
        ) COMMENT "产业链" partitioned by (ic string)
        row format delimited fields terminated by '\001';
        
        INSERT OVERWRITE TABLE dwd_b10000001_00003.dwd_p_industry_chain_company partition(ic)
        SELECT
        GET_JSON_OBJECT(a.content, '\$.source.company_id') as company_id,  
        REPLACE(GET_JSON_OBJECT(a.content, '\$.fields.nodeCode[0]'),'-','') AS industry_code,        
        GET_JSON_OBJECT(a.content, '\$.score') as industry_score,
        a.ic
        FROM ods_b10000001_00003.ods_p_industry_chain_company a 
        WHERE a.dt='$1' GROUP BY a.content, a.ic

EOF
  )
    hive -e "$dwd_sql"
    if [ $? == 0 ]; then
        echo "sucess dwd_sql ==>> $1"
    else
        echo "$dwd_sql"
        echo "error ==>> $1"
        exit 1
    fi
}

while [[ "$start_dt" -le "$end_dt" ]]; do
  echo $start_dt
  job $start_dt
  let start_dt=$(date -d "-1 days ago ${start_dt}" +%Y%m%d)
done

if [ $? == 0 ]; then
  job_success
else
  job_fail
fi
举报

相关推荐

0 条评论