#!/usr/bin/env -S bash --posix
# 文件名:sftp_to_hdfs.sh
# 功能:多用户动态日期目录的SFTP数据拉取(自动删除.ok文件)并上传HDFS
# 作者:王昱翔
# 版本:2.0(新增自动删除.ok文件功能)
set +o posix
# -------------------------------- 配置参数 --------------------------------
declare -A SFTP_ACCOUNTS=(
["用户"]="密码"
["用户"]="密码"
["用讯"]="密码"
)
SFTP_HOST="yuxiang.want.com" # SFTP服务器地址
SFTP_PORT="2222" # SFTP端口
declare -A SFTP_PATHS=(
["sms"]="/upload/receipt/sms/"
["outbound"]="/upload/receipt/outbound"
)
declare -A LOCAL_PATHS=(
["sms"]="/var/lib/hadoop-hdfs/sms"
["outbound"]="/var/lib/hadoop-hdfs/outbound"
)
declare -A HDFS_PATHS=(
["sms"]="/user/hive/warehouse/tc_ods.db/sh_cxj_dyy_sms_di/dt="
["outbound"]="/user/hive/warehouse/tc_ods.db/sh_cxj_dyy_outbound_di/dt="
)
LOG_FILE="/var/lib/hadoop-hdfs/multi_sftp_to_hdfs.log"
# -------------------------------- 函数定义 --------------------------------
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
validate_date() {
[[ $1 =~ ^[0-9]{8}$ ]] && date -d "$1" >/dev/null 2>&1
}
# -------------------------------- 初始化检查 --------------------------------
check_command() {
if ! command -v "$1" &>/dev/null; then
log "错误:未安装 $1,请先安装"
exit 1
fi
}
check_command sshpass
check_command sftp
check_command hdfs
mkdir -p "$(dirname "$LOG_FILE")" || { log "无法创建日志目录"; exit 1; }
# -------------------------------- 主处理逻辑 --------------------------------
for user_key in "${!SFTP_ACCOUNTS[@]}"; do
SFTP_USER="$user_key"
SFTP_PASSWORD="${SFTP_ACCOUNTS[$user_key]}"
log "====== 开始处理用户:$SFTP_USER ======"
for path_key in "${!SFTP_PATHS[@]}"; do
# 初始化路径变量
REMOTE_DIR="${SFTP_PATHS[$path_key]}"
LOCAL_ROOT="${LOCAL_PATHS[$path_key]}"
HDFS_PREFIX="${HDFS_PATHS[$path_key]}"
# 获取远程日期目录列表
log "[$SFTP_USER][$path_key] 正在获取远程日期目录..."
date_dirs=$(sshpass -p "$SFTP_PASSWORD" sftp -oStrictHostKeyChecking=no -P "$SFTP_PORT" "${SFTP_USER}@${SFTP_HOST}" <<< "ls -l $REMOTE_DIR" 2>/dev/null | awk '/^drwx/ {print $NF}')
[ -z "$date_dirs" ] && log "[$SFTP_USER][$path_key] 未找到有效日期目录" && continue
for date_dir in $date_dirs; do
# 验证日期格式(YYYYMMDD)
if ! validate_date "$date_dir"; then
log "[$SFTP_USER][$path_key] 跳过无效日期目录:$date_dir"
continue
fi
# 动态生成路径
LOCAL_DIR="${LOCAL_ROOT}/${date_dir}"
HDFS_TARGET="${HDFS_PREFIX}${date_dir}"
# 创建本地缓存目录
mkdir -p "$LOCAL_DIR" || { log "[$SFTP_USER][$path_key] 无法创建本地目录:$LOCAL_DIR"; continue; }
# ------------------------ SFTP 下载 ------------------------
log "[$SFTP_USER][$path_key][$date_dir] 开始SFTP连接"
sshpass -p "$SFTP_PASSWORD" sftp -o StrictHostKeyChecking=no -P "$SFTP_PORT" "${SFTP_USER}@${SFTP_HOST}" <<EOF
cd "${REMOTE_DIR}/${date_dir}"
lcd "$LOCAL_DIR"
get -r *
bye
EOF
if [ $? -ne 0 ]; then
log "[$SFTP_USER][$path_key][$date_dir] SFTP文件拉取失败"
continue
else
# 关键修改点:删除所有 .ok 文件
log "[$SFTP_USER][$path_key][$date_dir] 正在清理 .ok 文件..."
find "$LOCAL_DIR" -type f -name "*.ok" -delete 2>/dev/null
ok_count=$(find "$LOCAL_DIR" -type f -name "*.ok" | wc -l)
[ "$ok_count" -gt 0 ] && log "[警告] 仍有残留 .ok 文件: $ok_count 个"
# 统计有效文件
file_count=$(find "$LOCAL_DIR" -maxdepth 1 -type f | wc -l)
log "[$SFTP_USER][$path_key][$date_dir] SFTP拉取成功,剩余文件数:$file_count"
fi
# ------------------------ HDFS 上传 ------------------------
log "[$SFTP_USER][$path_key][$date_dir] 开始HDFS上传"
hdfs dfs -mkdir -p "$HDFS_TARGET" || { log "[$SFTP_USER][$path_key][$date_dir] HDFS目录创建失败"; continue; }
# 使用管道上传防止参数溢出
uploaded_count=0
while IFS= read -r file; do
hdfs dfs -put "$file" "$HDFS_TARGET" && ((uploaded_count++))
done < <(find "$LOCAL_DIR" -maxdepth 1 -type f)
if [ $uploaded_count -eq "$file_count" ]; then
log "[$SFTP_USER][$path_key][$date_dir] HDFS上传成功,路径:hdfs://$HDFS_TARGET"
else
log "[$SFTP_USER][$path_key][$date_dir] HDFS上传异常(成功 $uploaded_count/$file_count)"
fi
# ------------------------ 清理缓存 ------------------------
[ -n "$LOCAL_DIR" ] && rm -rf "${LOCAL_DIR:?}/"*
log "[$SFTP_USER][$path_key][$date_dir] 本地缓存已清理"
done
done
log "====== 用户处理完成:$SFTP_USER ======\n"
done
log "所有用户处理完成"