0
点赞
收藏
分享

微信扫一扫

hadoop-streaming万能脚本模板


shell相关比较逻辑运算语句

shell中的比较不是使用简单的> = <等,而是用扩展符,如下所示:

-eq     //equal  等于

-ne //no equal 不等于

-gt //great than 大于

-lt // low than 小于

ge // great and equal 大于等于,注意没有"-"

le //low and equal 小于等于,注意没有“-”

shell打印日志的方法

log(){
logstr=$1
now=`date "+%Y%m%d-%H:%M:%S"`
echo -e "[DEBUG ${now}]"$logstr
}

shell判断MR任务是否执行成功的代码

function hdp_succ(){
hdfiles=$1
max=$2
waittime=$3
for((i=0;i<$max;i+=1)); do
${HADOOP} fs -test -e $hdfiles/_SUCCESS
if [ $? -eq 0 ] ; then
break
fi
log "waiting..."
sleep $waittime
done
if [ $i -eq $max ] && [ $max -eq 36 ] ; then
curl --data-urlencode "subject=gpu2_deepfm:exit,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=alarm&"
exit 255
fi
if [ $i -eq $max ] && [ $max -eq 6 ] ; then
curl --data-urlencode "subject=gpu2_deepfm:wait,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=alarm&"
fi
}

hdp_succ ${hdfs_traindata} 36 "5m"
hdp_succ ${hdfs_evaldata} 36 "5m"

shell将hadoop集群上的文件拉到本地的模块

rm -rf $train_data
mkdir $train_data
files=`$HADOOP fs -ls ${hdfs_traindata} | awk -F ' ' '{print $8}'|grep 'part'`
for file_name in $files
do
echo $file_name
$HADOOP fs -get $file_name ${train_data}/.
done

shell工程化经验总结

获取日期

TODAY=`date +%Y%m%d`
lastday=`date +"%Y-%m-%d" -d "-1 day"`
lastday_v2=`date +"%Y%m%d" -d "-1 day"`

hadoop-straming的多路输入

INPUT_PAT=""
for ((k=$((i+0));k<$((i+30));k+=1));do
day_tmp=`date +%Y%m%d -d "$k days ago"`
INPUT_PAT="${hdfs_feedback_log_dir}/${day_tmp} ${INPUT_PAT}"
done

hadoop streaming \
-input $INPUT_PAT \

hadoop-streaming的reducer通用(bi分析)

def handle_line(segs, infos):
for i in range(len(infos)):
infos[i] += int(segs[i + 1])
return infos

def print_line(last_id, infos):
print_infos = [last_id]
for i in infos:
print_infos.append(str(i))
print('\t'.join(print_infos))

def reducer1():
last_id = ""
infos = []
flag = False
for line in sys.stdin:
line = line.strip("\n")
if len(line) > 0:
try:
segs = line.split("\t")
if not flag:
last_id = segs[0]
infos = [0] * (len(segs) - 1)
if segs[0] == last_id:
flag = True
infos = handle_line(segs, infos)
else:
print_line(last_id, infos)
infos = [0] * (len(segs) - 1)
last_id = segs[0]
infos = handle_line(segs, infos)
except:
infos = [0] * (len(line.split("\t")) - 1)
continue
print_line(last_id, infos)

hadoop-streaming模板

#!/bin/bash
set -x
HADOOP_HOME=/usr/bin/hadoop/software/hadoop/
hadoop=$HADOOP_HOME/bin/hadoop

map=cat
red=cat
map_num=2000
red_num=400
job_name=majing1-pd_template
priority=HIGH
alarm=''
delete=1
download=''
#compress="-jobconf mapred.compress.map.output=true -jobconf mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec -jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec"
compress="-jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec"

while getopts "i:o:m:r:x:y:k:s:n:f:p:d:a:c:l:h:e:" opt
do
case "$opt" in
a)
alarm=$OPTARG;;
d)
delete=$OPTARG;;
l)
download=$OPTARG;;
i)
input=$OPTARG;;
o)
output=$OPTARG;;
m)
map=$OPTARG;;
r)
red=$OPTARG;;
x)
map_num=$OPTARG;;
y)
red_num=$OPTARG;;
n)
job_name=$OPTARG;;
f)
f=$OPTARG;;
p)
priority=$OPTARG;;
s)
suffix_out="-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat"
suffix_out_value="-jobconf suffix.multiple.outputformat.filesuffix=$OPTARG";;
k)
key_field="-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner"
key_field_value1="-jobconf stream.num.map.output.key.fields=2"
key_field_value2="-jobconf num.key.fields.for.partition=1";;
h)
echo "cacheArchive:"$OPTARG
cacheArchive="-cacheArchive $OPTARG";;
e)
echo "cacheFile:"$OPTARG
cacheFile="-cacheFile $OPTARG";;
c)
unset compress;;
?)
echo '?';;
esac
done

function rawlog(){
if [ $delete -ne 0 ]; then
$hadoop fs -rmr $output
fi
$hadoop streaming \
-input $input \
-output "$output" \
-mapper "$map" \
-reducer "$red" \
-file $f \
$cacheArchive \
$cacheFile \
$key_field \
$suffix_out \
$suffix_out_value \
$key_field_value1 \
$key_field_value2 \
$compress \
-jobconf mapred.job.name=$job_name \
-jobconf mapred.map.tasks=$map_num \
-jobconf mapred.job.priority=$priority \
-jobconf mapred.reduce.tasks=$red_num \
-jobconf mapred.success.file.status=true \
-jobconf mapred.reduce.slowstart.completed.maps=0.9999 \
-jobconf mapreduce.job.queuename=hdp-svideo

$hadoop fs -test -e $output/_SUCCESS
if [ $? -ne 0 ]; then
if [ $alarm != '' ]; then
curl --data-urlencode "subject=${alarm}" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
fi
echo "$job_name failed"
exit 255
fi
if [[ $download != '' ]]; then
$hadoop fs -text $output/* > $download
fi
}

rawlog

shell工具模板

HADOOP=/usr/bin/hadoop/software/hadoop/bin/hadoop
PYTHON=/home/hdp-svideo-algo/majing1-pd/miniconda3/bin/python
HPY3=/opt/soft/anaconda3/bin/python
HPY2=/opt/soft/anaconda2/bin/python

function log(){
logstr=$1
now=`date "+%Y%m%d-%H:%M:%S"`
echo -e "[DEBUG ${now}]"$logstr
}

function alarm(){
content=$1
curl --data-urlencode "subject=${content}" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
}

function hdp_succ(){
hdfiles=$1
max=$2
waittime=$3
flag=$4
for((i=0;i<$max;i+=1)); do
${HADOOP} fs -test -e $hdfiles/_SUCCESS
if [ $? -eq 0 ] ; then
break
fi
log "waiting..."
sleep $waittime
done
if [ $i -eq $max ] && [ $max -eq 36 ] ; then
curl --data-urlencode "subject=${flag}:exit,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
exit 255
fi
if [ $i -eq $max ] && [ $max -eq 2 ] ; then
curl --data-urlencode "subject=${flag}:next,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
fi
if [ $i -eq 6 ] ; then
curl --data-urlencode "subject=${flag}:wait,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
fi
}

function hdp_succ_2(){
hdfiles=$1
max=$2
waittime=$3
flag=$4
succfile=$5
for((i=0;i<$max;i+=1)); do
${HADOOP} fs -test -e $hdfiles/${succfile}
if [ $? -eq 0 ] ; then
break
fi
log "waiting..."
sleep $waittime
done
if [ $i -eq $max ] && [ $max -eq 36 ] ; then
curl --data-urlencode "subject=${flag}:exit,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
exit 255
fi
if [ $i -eq $max ] && [ $max -eq 2 ] ; then
curl --data-urlencode "subject=${flag}:next,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
fi
if [ $i -eq 6 ] ; then
curl --data-urlencode "subject=${flag}:wait,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
fi
}

function result_succ(){
hdfiles=$1
flag=$2
${HADOOP} fs -test -e $hdfiles/_SUCCESS
if [ $? -ne 0 ] ; then
curl --data-urlencode "subject=${flag}_failed" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
exit
fi
}

function get_date(){
before_nday=$1
day=`date +%Y%m%d -d "$before_nday days ago"`
echo ${day}
}

function get_max_hdp_file(){
path1=$1
path2=$2
minline=$3
for (( i=0; i<60; i++ ));do
date=$(get_date $i)
path_detail=${path1}/${date}/${path2}
${HADOOP} fs -test -e ${path_detail}/_SUCCESS
if [ $? -eq 0 ] ; then
count=`${HADOOP} fs -dus ${path_detail} | awk -F ' ' '{print $1}'`
if [ $count -gt $minline ] ; then
break
fi
fi
done
echo ${date}
}

function get_max_hdp_file_2(){
path1=$1
path2=$2
minline=$3
path3=$4
for (( i=0; i<30; i++ ));do
date=$(get_date $i)
path_detail=${path1}/${date}${path3}/${path2}
${HADOOP} fs -test -e ${path_detail}/_SUCCESS
if [ $? -eq 0 ] ; then
count=`${HADOOP} fs -dus ${path_detail} | awk -F ' ' '{print $1}'`
if [ $count -gt $minline ] ; then
break
fi
fi
done
echo ${date}
}


举报

相关推荐

0 条评论