0
点赞
收藏
分享

微信扫一扫

Hive实战

朱小落 2022-03-16 阅读 74

一.数据结构

 

                1.video表

字段备注详细描述
video id视频唯一id(String)11位字符串
uploader视频上传者(String)上传视频的用户名String
age视频年龄(int)视频在平台上的整数天
category视频类别(Array<String>)上传视频指定的视频分类
length视频长度(Int)整形数字标识的视频长度
views观看次数(Int)视频被浏览的次数
rate视频评分(Double)满分5分
Ratings流量(Int)视频的流量,整型数字
conments评论数(Int)一个视频的整数评论数
related ids相关视频id(Array<String>)相关视频的id,最多20个

                2.user表

字段备注字段类型
uploader上传者用户名string
videos上传视频数int
friends朋友数量int

二、ETL数据清洗

1.观察原始数据显示,寻找分割规则,清洗无效数据

SDNkMu8ZT68	w00dy911	630	People & Blogs	186	10181	3.49	494	257	rjnbgpPJUks

        通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。

2.根据需求进行API编写清洗数据

        1.新建maven项目导入必要依赖,日志文件创建

        配置pom.xml

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <!--<dependency>-->
        <!--<groupId>org.apache.hadoop</groupId>-->
        <!--<artifactId>hadoop-client-runtime</artifactId>-->
        <!--<version>3.1.3</version>-->
        <!--</dependency>-->
    </dependencies>

        配置日志log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

        配置日志log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- 类型名为Console,名称为必须属性 -->
        <Appender type="Console" name="STDOUT">
            <!-- 布局为PatternLayout的方式,
            输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
            <Layout type="PatternLayout"
                    pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
        </Appender>

    </Appenders>

    <Loggers>
        <!-- 可加性为false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>

        <!-- root loggerConfig设置 -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>

</Configuration>

        2.编写Mapper、Driver

package com.hive.etl;

import org.apache.hadoop.fs.shell.Count;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * ETL处理mapper
 */
public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    //通过计数器
    private Counter pass;
    //抛弃计数器
    private Counter fail;
    //创建一个sb成员对象,作用拼接字段,
    private StringBuilder sb = new StringBuilder();
    //创建一个Text将sb拼接的字符串输出给context
    private Text result = new Text();
    /**
     * 计数器方法
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        pass = context.getCounter("ETL","PASS");
        fail = context.getCounter("ETL","FAIL");
    }

    /**
     * 将一行日志进行处理,将第四个字段中的空格去掉,将最后相关视频的分隔符改成‘&’,字段长度不够的数据抛弃
     * video id、    uploader、age、  category、    length、 views、rate、Ratings、conments、related ids
     *  1             2        3           4          5      6     7      8      9       10
     * SDNkMu8ZT68 w00dy911   630    People & Blogs 186    10181  3.49   494    257     rjnbgpPJUks
     *
     * @param key     行号
     * @param value   一行日志
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //一行数据
        String line = value.toString();
        //将数据切分
        String[] fields = line.split("\t");

        //判断日志字段是否少于9 video id、uploader、age、category、length、views、rate、Ratings、conments必要信息是否存在
        //如果大于等于9
        if (fields.length >= 9) {
            //去掉第四个字段的空格,重写第四个字段
            fields[3] = fields[3].replace(" ","");//将空格改为空串,再写回第四个字段

            //拼接字段位成一行,并处理最后几个字段related ids的分隔符
            sb.setLength(0);//拼接前,循环时每次处理新的一行进行清零
            for (int i = 0; i < fields.length; i++) {
                //如果当前正在拼接的字段是一行的最后一个字段
                if (i == fields.length-1){
                    sb.append(fields[i]);
                }else if (i <= 8){
                    //如果拼接的是前面九个字段,使用分隔符\t拼接
                    sb.append(fields[i]).append("\t");
                }else {
                    //此时拼接的字段是related ids的字段,使用&拼接
                    sb.append(fields[i]).append("&");
                }
            }
            result.set(sb.toString());

            context.write(result,NullWritable.get());
            //通过计数器
            pass.increment(1);
        } else {
            //如果小于9数据抛弃
            //抛弃计数器
            fail.increment(1);
        }
    }
}
package com.hive.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ETLDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);
        job.setJarByClass(ETLDriver.class);
        job.setMapperClass(ETLMapper.class);
        job.setNumReduceTasks(0);

        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);


    }
}

3.将API打包上传到集群完成数据清洗

 yarn jar etltool-1.0-SNAPSHOT.jar com.hive.etl.ETLDriver /gulivideo/video   /video_etl

 三.在Hive中创建外部表映射数据

        1.创建使用数据库

create database gulivideo;
use gulivideo;

       2. 创建video外部表

create external table video_ori(
    videoId string, 
    uploader string, 
    age int, 
    category array<string>, 
    length int, 
    views int, 
    rate float, 
    ratings int, 
    comments int,
    relatedId array<string>)
row format delimited fields terminated by "\t"
collection items terminated by "&"
location '/gulivideo/video_etl';

        3.创建user外部表

create external table user_ori(
    uploader string,
    videos int,
    friends int)
row format delimited fields terminated by "\t" 
location '/gulivideo/user';

      4.  创建video orc内部表

create table video_orc(
    videoId string, 
    uploader string, 
    age int, 
    category array<string>, 
    length int, 
    views int, 
    rate float, 
    ratings int, 
    comments int,
    relatedId array<string>)
stored as orc
tblproperties("orc.compress"="SNAPPY");

       5. 创建user orc内部表

create table user_orc(
    uploader string,
    videos int,
    friends int)
stored as orc
tblproperties("orc.compress"="SNAPPY");

        6.上传数据

--从外部表中插入数据
insert into table video_orc select * from video_ori;
insert into table user_orc select * from user_ori;

4.业务分析

1. 统计视频观看数top10

使用order by按照views字段做一个全局排序即可,设置只显示前10条。

SELECT
    videoid,
    views
FROM
    video_orc
ORDER BY
    views DESC
LIMIT 10;

2. 统计视频类别热度Top10

1.定义视频类别热度(假设按照类别下视频的个数来决定)

2.使用lateral view explode()将category列炸开

select videoid,
       cate
from video_orc
         lateral view explode(category) tbl as cate;

 

 2.对category列进行分组并求和videoid列

select count(videoid) vd,
       cate
from video_orc
         lateral view
             explode(category) tbl as cate

 

 3.倒序排序limit取前10

select count(videoid) vd,
       cate
from video_orc
         lateral view
             explode(category) tbl as cate
group by cate
order by vd desc
limit 10;

 

 

3.统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数

1..统计出视频观看数最高的20个视频的

select videoid, `views`, category
from video_orc
order by `views` desc
limit 20;

 2.将类别炸开

 lateral view explode(category) tbl as cate

3.分组group by类别,count类别个数

4. 统计视频观看数Top50所关联视频的所属类别排序

1.统计视频观看数前50所关联视频的,获取到关联视频

select videoid, `views` ,relatedid
from video_orc
order by `views` desc
limit 50;

 2.将关联视频的列炸开

select explode(relatedid) rvid
from (select videoid, `views`, relatedid
      from video_orc
      order by `views` desc
      limit 50) t1;

 

 3.将炸开后的关联视频和原表join获得类别,同时对关联视频进行去重

select distinct t2.rvid, v.category
from (select explode(relatedid) rvid
      from (select videoid, `views`, relatedid
            from video_orc
            order by `views` desc
            limit 50) t1) t2
         join video_orc v on t2.rvid = v.videoid;

 

 4.将类别列炸开

select cate,rvid
from(select distinct t2.rvid, v.category
     from (select explode(relatedid) rvid
           from (select videoid, `views`, relatedid
                 from video_orc
                 order by `views` desc
                 limit 50) t1) t2
              join video_orc v on t2.rvid = v.videoid) t3
   lateral view explode(category) tbl as cate;

 

5.以上步骤已经取出类别,这一步对查看类别的热度

select count(videoid) vd,
       cate
from video_orc
         lateral view
             explode(category) tbl as cate
group by cate

 6.将第四步炸开的类别列表与类别热度表通过cate连接,统计热度并排序

select distinct t4.cate, t5.vd
from (select cate, rvid
      from (select distinct t2.rvid, v.category
            from (select explode(relatedid) rvid
                  from (select videoid, `views`, relatedid
                        from video_orc
                        order by `views` desc
                        limit 50) t1) t2
                     join video_orc v on t2.rvid = v.videoid) t3
               lateral view explode(category) tbl as cate) t4
         join (select count(videoid) vd,
                      cate
               from video_orc
                        lateral view
                            explode(category) tbl as cate
               group by cate) t5 on t4.cate = t5.cate
order by t5.vd desc ;

5. 统计每个类别中的视频热度Top10,以Music为例

1.将视频表的类别列炸开,提前成中间表格video_category

create table video_category stored as orc tblproperties ("orc.compress" = "SNAPPY")
as
select videoid,
       uploader,
       age,
       cate,
       length,
       `views`,
       rate,
       ratings,
       comments,
       relatedid
from video_orc
         lateral view explode(category) tbl as cate;

2.从video_category直接查询Music类的前10视频

select videoid, `views`
from video_category
where cate = "Music"
order by `views` desc
limit 10;

6. 统计每个类别中视频流量Top10,以Music为例

select videoid, ratings
from video_category
where cate = "Music"
order by ratings desc
limit 10;

7. 统计上传视频最多的用户Top10以及他们上传的观看次数在前20的视频

理解一:上传视频前十用户每人前20热度视频

1. 统计视频上传最多的用户Top10

select uploader, videos
from user_orc
order by videos desc
limit 10;

 2.和video_orc联立,找出这些用户上传的视频,并按照热度排名

select t1.uploader, v.videoid, rank() over (partition by t1.uploader order by v.`views` desc ) hot
from (select uploader, videos
      from user_orc
      order by videos desc
      limit 10) t1
         join video_orc v on t1.uploader = v.uploader;

3.where每个人前20

select
t2.uploader ,t2.videoid,t2.hot
from (select t1.uploader, v.videoid, rank() over (partition by t1.uploader order by v.`views` desc ) hot
      from (select uploader, videos
            from user_orc
            order by videos desc
            limit 10) t1
               left join video_orc v on t1.uploader = v.uploader) t2
where hot<=20;

 理解二:top20的视频中是否有top10用户上传的

1. 统计视频上传最多的用户Top10

select uploader, videos
from user_orc
order by videos desc
limit 10;

2.统计观看次数前20的视频

select uploader, `views`, videoid
from video_orc
order by `views` desc
limit 20;

 3.俩表联立,对比上传者

select t1.uploader,t2.videoid
from (select uploader, videos
      from user_orc
      order by videos desc
      limit 10) t1
         left join (select uploader, `views`, videoid
                    from video_orc
                    order by `views` desc
                    limit 20) t2 on t1.uploader = t2.uploader;

 

8. 统计每个类别视频观看数Top10

1.从video_category表查出每个类别视频观看数排名,对各个类别进行分区通过观看次数排名

select 
cate,
videoid,
 `views`, 
rank() over (partition by cate order by `views` desc ) hot
from video_category

 2.取每个类别前20

select cate, hot, videoid
from (select cate, videoid, `views`, rank() over (partition by cate order by `views` desc ) hot
      from video_category) t1
where t1.hot <= 10;

举报

相关推荐

0 条评论