目录
Hive数仓ETL过程
- Synchronize industrial chain data to HDFS
 
hadoop distcp hdfs://10.xx.xx.x:8020/aidata/industrial_chain/ hdfs://10.xx.xx.xx:8020/data/aidata/industrial_chain/
- Creating tables in ODS
 
CREATE TABLE IF NOT EXISTS ods_b10000001_00003.ods_p_industry_chain_company(
  `content` string)
PARTITIONED BY (dt string, ic string)
row format delimited fields terminated by '\001'
- Load data into ODS
 
LOAD DATA INPATH '/data/aidata/industrial_chain/{yyyyMMdd}/{ic}' 
OVERWRITE INTO TABLE ods_b10000001_00003.ods_p_industry_chain_company PARTITION(dt='{yyyyMMdd}',ic='{ic}');
- Processing ODS data into DWD
 
CREATE TABLE IF NOT EXISTS dwd_b10000001_00003.dwd_p_industry_chain_company (
company_id string comment "企业唯一标识",
industry_code string comment "产业编码",
industry_score string comment "产业评分"
) COMMENT "产业链" partitioned by (ic string)
row format delimited fields terminated by '\001';
INSERT OVERWRITE TABLE dwd_b10000001_00003.dwd_p_industry_chain_company (partition ic)
SELECT
GET_JSON_OBJECT(a.content, '$.source.company_id') as company_id,  
REPLACE(GET_JSON_OBJECT(a.content, '$.fields.nodeCode[0]'),'-','') AS industry_code,
GET_JSON_OBJECT(a.content, '$.score') as industry_score,
a.ic
FROM ods_b10000001_00003.ods_p_industry_chain_company a 
WHERE a.dt='{yyyyMMdd}'
- Shell script
 
#!/bin/bash
anynowtime="date +'%Y-%m-%d %H:%M:%S'"
NOW="echo [\`$anynowtime\`][PID:$$]"
if [ $1 ]; then
  start_dt="$1"
else
  start_dt=$(date +"%Y%m%d" -d "-1day")
fi
if [ $2 ]; then
  end_dt="$2"
else
  end_dt="$1"
fi
function job_start() {
  echo "$(eval $NOW) job_start"
}
function job_success() {
  MSG="$*"
  echo "$(eval $NOW) job_success:[$MSG]"
  exit 0
}
function job_fail() {
  MSG="$*"
  echo "$(eval $NOW) job_fail:[$MSG]"
  exit 1
}
job_start
function job() {
  for ic in `hadoop fs -ls -C hdfs://10.xx.xx.x:8020/aidata/industrial_chain/$1/output`; do
    echo "当前日期:"$1 "当前分区:"${ic##*/} "aidata路径:"$ic
    hadoop distcp $ic hdfs://10.32.49.19:8020/data/aidata/industrial_chain/$1/${ic##*/}
    sql=$(
    cat <<-EOF
        set hive.execution.engine=mr;
        CREATE TABLE IF NOT EXISTS ods_b10000001_00003.ods_p_industry_chain_company(
        content string) PARTITIONED BY (dt string, ic string)
        row format delimited fields terminated by '\001';
        LOAD DATA INPATH '/data/aidata/industrial_chain/$1/${ic##*/}' 
        OVERWRITE INTO TABLE ods_b10000001_00003.ods_p_industry_chain_company PARTITION(dt='$1',ic='${ic##*/}');        
EOF
    )
    hive -e "$sql"
    if [ $? == 0 ]; then
      echo "sucess ==>> $1 $ic"
    else
      echo "$sql"
      echo "error ==>> $1 $ic"
      exit 1
    fi
  done
  dwd_sql=$(
      cat <<-EOF
        set hive.execution.engine=mr;
        set hive.exec.dynamic.partition.mode=nonstrict;
        CREATE TABLE IF NOT EXISTS dwd_b10000001_00003.dwd_p_industry_chain_company (
        company_id string comment "企业唯一标识",
        industry_code string comment "产业编码",
        industry_score string comment "产业评分"
        ) COMMENT "产业链" partitioned by (ic string)
        row format delimited fields terminated by '\001';
        
        INSERT OVERWRITE TABLE dwd_b10000001_00003.dwd_p_industry_chain_company partition(ic)
        SELECT
        GET_JSON_OBJECT(a.content, '\$.source.company_id') as company_id,  
        REPLACE(GET_JSON_OBJECT(a.content, '\$.fields.nodeCode[0]'),'-','') AS industry_code,        
        GET_JSON_OBJECT(a.content, '\$.score') as industry_score,
        a.ic
        FROM ods_b10000001_00003.ods_p_industry_chain_company a 
        WHERE a.dt='$1' GROUP BY a.content, a.ic
EOF
  )
    hive -e "$dwd_sql"
    if [ $? == 0 ]; then
        echo "sucess dwd_sql ==>> $1"
    else
        echo "$dwd_sql"
        echo "error ==>> $1"
        exit 1
    fi
}
while [[ "$start_dt" -le "$end_dt" ]]; do
  echo $start_dt
  job $start_dt
  let start_dt=$(date -d "-1 days ago ${start_dt}" +%Y%m%d)
done
if [ $? == 0 ]; then
  job_success
else
  job_fail
fi