sftp数据拉到hdfs中表中

阅读 34

05-21 12:00

#!/usr/bin/env -S bash --posix
# 文件名:sftp_to_hdfs.sh
# 功能:多用户动态日期目录的SFTP数据拉取(自动删除.ok文件)并上传HDFS
# 作者:王昱翔
# 版本:2.0(新增自动删除.ok文件功能)

set +o posix 

# -------------------------------- 配置参数 --------------------------------
declare -A SFTP_ACCOUNTS=(
    ["用户"]="密码"
    ["用户"]="密码"
    ["用讯"]="密码"
)

SFTP_HOST="yuxiang.want.com"  # SFTP服务器地址
SFTP_PORT="2222"                  # SFTP端口

declare -A SFTP_PATHS=(
    ["sms"]="/upload/receipt/sms/"
    ["outbound"]="/upload/receipt/outbound"
)

declare -A LOCAL_PATHS=(
    ["sms"]="/var/lib/hadoop-hdfs/sms" 
    ["outbound"]="/var/lib/hadoop-hdfs/outbound"
)

declare -A HDFS_PATHS=(
    ["sms"]="/user/hive/warehouse/tc_ods.db/sh_cxj_dyy_sms_di/dt="
    ["outbound"]="/user/hive/warehouse/tc_ods.db/sh_cxj_dyy_outbound_di/dt="
)

LOG_FILE="/var/lib/hadoop-hdfs/multi_sftp_to_hdfs.log"

# -------------------------------- 函数定义 --------------------------------
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

validate_date() {
    [[ $1 =~ ^[0-9]{8}$ ]] && date -d "$1" >/dev/null 2>&1
}

# -------------------------------- 初始化检查 --------------------------------
check_command() {
    if ! command -v "$1" &>/dev/null; then
        log "错误:未安装 $1,请先安装"
        exit 1
    fi
}

check_command sshpass
check_command sftp
check_command hdfs
mkdir -p "$(dirname "$LOG_FILE")" || { log "无法创建日志目录"; exit 1; }

# -------------------------------- 主处理逻辑 --------------------------------
for user_key in "${!SFTP_ACCOUNTS[@]}"; do
    SFTP_USER="$user_key"
    SFTP_PASSWORD="${SFTP_ACCOUNTS[$user_key]}"
    
    log "====== 开始处理用户:$SFTP_USER ======"
    
    for path_key in "${!SFTP_PATHS[@]}"; do
        # 初始化路径变量
        REMOTE_DIR="${SFTP_PATHS[$path_key]}"
        LOCAL_ROOT="${LOCAL_PATHS[$path_key]}"
        HDFS_PREFIX="${HDFS_PATHS[$path_key]}"
        
        # 获取远程日期目录列表
        log "[$SFTP_USER][$path_key] 正在获取远程日期目录..."
        date_dirs=$(sshpass -p "$SFTP_PASSWORD" sftp -oStrictHostKeyChecking=no -P "$SFTP_PORT" "${SFTP_USER}@${SFTP_HOST}" <<< "ls -l $REMOTE_DIR" 2>/dev/null | awk '/^drwx/ {print $NF}')
        
        [ -z "$date_dirs" ] && log "[$SFTP_USER][$path_key] 未找到有效日期目录" && continue
        
        for date_dir in $date_dirs; do
            # 验证日期格式(YYYYMMDD)
            if ! validate_date "$date_dir"; then
                log "[$SFTP_USER][$path_key] 跳过无效日期目录:$date_dir"
                continue
            fi
            
            # 动态生成路径
            LOCAL_DIR="${LOCAL_ROOT}/${date_dir}"
            HDFS_TARGET="${HDFS_PREFIX}${date_dir}"
            
            # 创建本地缓存目录
            mkdir -p "$LOCAL_DIR" || { log "[$SFTP_USER][$path_key] 无法创建本地目录:$LOCAL_DIR"; continue; }

            # ------------------------ SFTP 下载 ------------------------
            log "[$SFTP_USER][$path_key][$date_dir] 开始SFTP连接"
            sshpass -p "$SFTP_PASSWORD" sftp -o StrictHostKeyChecking=no -P "$SFTP_PORT" "${SFTP_USER}@${SFTP_HOST}" <<EOF
cd "${REMOTE_DIR}/${date_dir}"
lcd "$LOCAL_DIR"
get -r *
bye
EOF

            if [ $? -ne 0 ]; then
                log "[$SFTP_USER][$path_key][$date_dir] SFTP文件拉取失败"
                continue
            else
                # 关键修改点:删除所有 .ok 文件
                log "[$SFTP_USER][$path_key][$date_dir] 正在清理 .ok 文件..."
                find "$LOCAL_DIR" -type f -name "*.ok" -delete 2>/dev/null
                ok_count=$(find "$LOCAL_DIR" -type f -name "*.ok" | wc -l)
                [ "$ok_count" -gt 0 ] && log "[警告] 仍有残留 .ok 文件: $ok_count 个"
                
                # 统计有效文件
                file_count=$(find "$LOCAL_DIR" -maxdepth 1 -type f | wc -l)
                log "[$SFTP_USER][$path_key][$date_dir] SFTP拉取成功,剩余文件数:$file_count"
            fi

            # ------------------------ HDFS 上传 ------------------------
            log "[$SFTP_USER][$path_key][$date_dir] 开始HDFS上传"
            hdfs dfs -mkdir -p "$HDFS_TARGET" || { log "[$SFTP_USER][$path_key][$date_dir] HDFS目录创建失败"; continue; }
            
            # 使用管道上传防止参数溢出
            uploaded_count=0
            while IFS= read -r file; do
                hdfs dfs -put "$file" "$HDFS_TARGET" && ((uploaded_count++))
            done < <(find "$LOCAL_DIR" -maxdepth 1 -type f)
            
            if [ $uploaded_count -eq "$file_count" ]; then
                log "[$SFTP_USER][$path_key][$date_dir] HDFS上传成功,路径:hdfs://$HDFS_TARGET"
            else
                log "[$SFTP_USER][$path_key][$date_dir] HDFS上传异常(成功 $uploaded_count/$file_count)"
            fi

            # ------------------------ 清理缓存 ------------------------
            [ -n "$LOCAL_DIR" ] && rm -rf "${LOCAL_DIR:?}/"*
            log "[$SFTP_USER][$path_key][$date_dir] 本地缓存已清理"
        done
    done
    log "====== 用户处理完成:$SFTP_USER ======\n"
done

log "所有用户处理完成"

精彩评论(0)

0 0 举报