一.数据结构
1.video表
字段 | 备注 | 详细描述 |
video id | 视频唯一id(String) | 11位字符串 |
uploader | 视频上传者(String) | 上传视频的用户名String |
age | 视频年龄(int) | 视频在平台上的整数天 |
category | 视频类别(Array<String>) | 上传视频指定的视频分类 |
length | 视频长度(Int) | 整形数字标识的视频长度 |
views | 观看次数(Int) | 视频被浏览的次数 |
rate | 视频评分(Double) | 满分5分 |
Ratings | 流量(Int) | 视频的流量,整型数字 |
conments | 评论数(Int) | 一个视频的整数评论数 |
related ids | 相关视频id(Array<String>) | 相关视频的id,最多20个 |
2.user表
字段 | 备注 | 字段类型 |
uploader | 上传者用户名 | string |
videos | 上传视频数 | int |
friends | 朋友数量 | int |
二、ETL数据清洗
1.观察原始数据显示,寻找分割规则,清洗无效数据
SDNkMu8ZT68 w00dy911 630 People & Blogs 186 10181 3.49 494 257 rjnbgpPJUks
通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。
2.根据需求进行API编写清洗数据
1.新建maven项目导入必要依赖,日志文件创建
配置pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.hadoop</groupId>-->
<!--<artifactId>hadoop-client-runtime</artifactId>-->
<!--<version>3.1.3</version>-->
<!--</dependency>-->
</dependencies>
配置日志log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
配置日志log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
<Appenders>
<!-- 类型名为Console,名称为必须属性 -->
<Appender type="Console" name="STDOUT">
<!-- 布局为PatternLayout的方式,
输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
<Layout type="PatternLayout"
pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
</Appender>
</Appenders>
<Loggers>
<!-- 可加性为false -->
<Logger name="test" level="info" additivity="false">
<AppenderRef ref="STDOUT" />
</Logger>
<!-- root loggerConfig设置 -->
<Root level="info">
<AppenderRef ref="STDOUT" />
</Root>
</Loggers>
</Configuration>
2.编写Mapper、Driver
package com.hive.etl;
import org.apache.hadoop.fs.shell.Count;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* ETL处理mapper
*/
public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
//通过计数器
private Counter pass;
//抛弃计数器
private Counter fail;
//创建一个sb成员对象,作用拼接字段,
private StringBuilder sb = new StringBuilder();
//创建一个Text将sb拼接的字符串输出给context
private Text result = new Text();
/**
* 计数器方法
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
pass = context.getCounter("ETL","PASS");
fail = context.getCounter("ETL","FAIL");
}
/**
* 将一行日志进行处理,将第四个字段中的空格去掉,将最后相关视频的分隔符改成‘&’,字段长度不够的数据抛弃
* video id、 uploader、age、 category、 length、 views、rate、Ratings、conments、related ids
* 1 2 3 4 5 6 7 8 9 10
* SDNkMu8ZT68 w00dy911 630 People & Blogs 186 10181 3.49 494 257 rjnbgpPJUks
*
* @param key 行号
* @param value 一行日志
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//一行数据
String line = value.toString();
//将数据切分
String[] fields = line.split("\t");
//判断日志字段是否少于9 video id、uploader、age、category、length、views、rate、Ratings、conments必要信息是否存在
//如果大于等于9
if (fields.length >= 9) {
//去掉第四个字段的空格,重写第四个字段
fields[3] = fields[3].replace(" ","");//将空格改为空串,再写回第四个字段
//拼接字段位成一行,并处理最后几个字段related ids的分隔符
sb.setLength(0);//拼接前,循环时每次处理新的一行进行清零
for (int i = 0; i < fields.length; i++) {
//如果当前正在拼接的字段是一行的最后一个字段
if (i == fields.length-1){
sb.append(fields[i]);
}else if (i <= 8){
//如果拼接的是前面九个字段,使用分隔符\t拼接
sb.append(fields[i]).append("\t");
}else {
//此时拼接的字段是related ids的字段,使用&拼接
sb.append(fields[i]).append("&");
}
}
result.set(sb.toString());
context.write(result,NullWritable.get());
//通过计数器
pass.increment(1);
} else {
//如果小于9数据抛弃
//抛弃计数器
fail.increment(1);
}
}
}
package com.hive.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ETLDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(ETLDriver.class);
job.setMapperClass(ETLMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
3.将API打包上传到集群完成数据清洗
yarn jar etltool-1.0-SNAPSHOT.jar com.hive.etl.ETLDriver /gulivideo/video /video_etl
三.在Hive中创建外部表映射数据
1.创建使用数据库
create database gulivideo;
use gulivideo;
2. 创建video外部表
create external table video_ori(
videoId string,
uploader string,
age int,
category array<string>,
length int,
views int,
rate float,
ratings int,
comments int,
relatedId array<string>)
row format delimited fields terminated by "\t"
collection items terminated by "&"
location '/gulivideo/video_etl';
3.创建user外部表
create external table user_ori(
uploader string,
videos int,
friends int)
row format delimited fields terminated by "\t"
location '/gulivideo/user';
4. 创建video orc内部表
create table video_orc(
videoId string,
uploader string,
age int,
category array<string>,
length int,
views int,
rate float,
ratings int,
comments int,
relatedId array<string>)
stored as orc
tblproperties("orc.compress"="SNAPPY");
5. 创建user orc内部表
create table user_orc(
uploader string,
videos int,
friends int)
stored as orc
tblproperties("orc.compress"="SNAPPY");
6.上传数据
--从外部表中插入数据
insert into table video_orc select * from video_ori;
insert into table user_orc select * from user_ori;
4.业务分析
1. 统计视频观看数top10
使用order by按照views字段做一个全局排序即可,设置只显示前10条。
SELECT
videoid,
views
FROM
video_orc
ORDER BY
views DESC
LIMIT 10;
2. 统计视频类别热度Top10
1.定义视频类别热度(假设按照类别下视频的个数来决定)
2.使用lateral view explode()将category列炸开
select videoid,
cate
from video_orc
lateral view explode(category) tbl as cate;
2.对category列进行分组并求和videoid列
select count(videoid) vd,
cate
from video_orc
lateral view
explode(category) tbl as cate
3.倒序排序limit取前10
select count(videoid) vd,
cate
from video_orc
lateral view
explode(category) tbl as cate
group by cate
order by vd desc
limit 10;
3.统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数
1..统计出视频观看数最高的20个视频的
select videoid, `views`, category
from video_orc
order by `views` desc
limit 20;
2.将类别炸开
lateral view explode(category) tbl as cate
3.分组group by类别,count类别个数
4. 统计视频观看数Top50所关联视频的所属类别排序
1.统计视频观看数前50所关联视频的,获取到关联视频
select videoid, `views` ,relatedid
from video_orc
order by `views` desc
limit 50;
2.将关联视频的列炸开
select explode(relatedid) rvid
from (select videoid, `views`, relatedid
from video_orc
order by `views` desc
limit 50) t1;
3.将炸开后的关联视频和原表join获得类别,同时对关联视频进行去重
select distinct t2.rvid, v.category
from (select explode(relatedid) rvid
from (select videoid, `views`, relatedid
from video_orc
order by `views` desc
limit 50) t1) t2
join video_orc v on t2.rvid = v.videoid;
4.将类别列炸开
select cate,rvid
from(select distinct t2.rvid, v.category
from (select explode(relatedid) rvid
from (select videoid, `views`, relatedid
from video_orc
order by `views` desc
limit 50) t1) t2
join video_orc v on t2.rvid = v.videoid) t3
lateral view explode(category) tbl as cate;
5.以上步骤已经取出类别,这一步对查看类别的热度
select count(videoid) vd,
cate
from video_orc
lateral view
explode(category) tbl as cate
group by cate
6.将第四步炸开的类别列表与类别热度表通过cate连接,统计热度并排序
select distinct t4.cate, t5.vd
from (select cate, rvid
from (select distinct t2.rvid, v.category
from (select explode(relatedid) rvid
from (select videoid, `views`, relatedid
from video_orc
order by `views` desc
limit 50) t1) t2
join video_orc v on t2.rvid = v.videoid) t3
lateral view explode(category) tbl as cate) t4
join (select count(videoid) vd,
cate
from video_orc
lateral view
explode(category) tbl as cate
group by cate) t5 on t4.cate = t5.cate
order by t5.vd desc ;
5. 统计每个类别中的视频热度Top10,以Music为例
1.将视频表的类别列炸开,提前成中间表格video_category
create table video_category stored as orc tblproperties ("orc.compress" = "SNAPPY")
as
select videoid,
uploader,
age,
cate,
length,
`views`,
rate,
ratings,
comments,
relatedid
from video_orc
lateral view explode(category) tbl as cate;
2.从video_category直接查询Music类的前10视频
select videoid, `views`
from video_category
where cate = "Music"
order by `views` desc
limit 10;
6. 统计每个类别中视频流量Top10,以Music为例
select videoid, ratings
from video_category
where cate = "Music"
order by ratings desc
limit 10;
7. 统计上传视频最多的用户Top10以及他们上传的观看次数在前20的视频
理解一:上传视频前十用户每人前20热度视频
1. 统计视频上传最多的用户Top10
select uploader, videos
from user_orc
order by videos desc
limit 10;
2.和video_orc联立,找出这些用户上传的视频,并按照热度排名
select t1.uploader, v.videoid, rank() over (partition by t1.uploader order by v.`views` desc ) hot
from (select uploader, videos
from user_orc
order by videos desc
limit 10) t1
join video_orc v on t1.uploader = v.uploader;
3.where每个人前20
select
t2.uploader ,t2.videoid,t2.hot
from (select t1.uploader, v.videoid, rank() over (partition by t1.uploader order by v.`views` desc ) hot
from (select uploader, videos
from user_orc
order by videos desc
limit 10) t1
left join video_orc v on t1.uploader = v.uploader) t2
where hot<=20;
理解二:top20的视频中是否有top10用户上传的
1. 统计视频上传最多的用户Top10
select uploader, videos
from user_orc
order by videos desc
limit 10;
2.统计观看次数前20的视频
select uploader, `views`, videoid
from video_orc
order by `views` desc
limit 20;
3.俩表联立,对比上传者
select t1.uploader,t2.videoid
from (select uploader, videos
from user_orc
order by videos desc
limit 10) t1
left join (select uploader, `views`, videoid
from video_orc
order by `views` desc
limit 20) t2 on t1.uploader = t2.uploader;
8. 统计每个类别视频观看数Top10
1.从video_category表查出每个类别视频观看数排名,对各个类别进行分区通过观看次数排名
select
cate,
videoid,
`views`,
rank() over (partition by cate order by `views` desc ) hot
from video_category
2.取每个类别前20
select cate, hot, videoid
from (select cate, videoid, `views`, rank() over (partition by cate order by `views` desc ) hot
from video_category) t1
where t1.hot <= 10;