文章目录
- 一、Apache Pig概述
- 二、Apache Pig架构
- 三、Apache Pig安装
- 四、Apache Pig执行模式
- 五、Apache Pig执行机制
- 六、Pig Latin基础知识
- 七、简单使用
- 八、Apache Pig与其它组件对比
一、Apache Pig概述
官网:https://pig.apache.org/
官方文档:https://pig.apache.org/docs/r0.17.0/
Apache Pig具有以下特点:
-
丰富的运算符集 - 它提供了许多运算符来执行诸如join,sort,filer等操作。
-
易于编程 - Pig Latin与SQL类似,如果你善于使用SQL,则很容易编写Pig脚本。
-
优化机会 - Apache Pig中的任务自动优化其执行,因此程序员只需要关注语言的语义。
-
可扩展性 - 使用现有的操作符,用户可以开发自己的功能来读取、处理和写入数据。
-
用户定义函数 - Pig提供了在其他编程语言(如Java)中创建用户定义函数的功能,并且可以调用或嵌入到Pig脚本中。
-
处理各种数据 - Apache Pig分析各种数据,无论是结构化还是非结构化,它将结果存储在HDFS中。
Pig包括两部分:
- 用于描述数据流的语言,称为
Pig Latin
,Pig Latin是类似SQL的语言。 - 用于运行PigLatin程序的 执行环境 。一个是 本地 的单JVM执行环境,一个就是在 hadoop集群上的分布式执行环境。
二、Apache Pig架构
1)架构图
在内部,Apache Pig将这些脚本转换为一系列MapReduce作业,因此它使程序员的工作变得轻松。Apache Pig的体系结构如下所示:
2)Apache Pig组件
1、Parser(解析器)
2、Optimizer(优化器)
3、Compiler(编译器)
4、Execution engine(执行引擎)
三、Apache Pig安装
如果使用的不是本地模式,就是必须安装好Haood基础环境,可以参考我之前的文章:
大数据Hadoop原理介绍+安装+实战操作(HDFS+YARN+MapReduce)
1)下载Apache Pig
下载地址:https://pig.apache.org/releases.html
下载最新版本:
$ mkdir -p /opt/bigdata/hadoop/software/pig ; cd /opt/bigdata/hadoop/software/pig
$ wget https://dlcdn.apache.org/pig/pig-0.17.0/pig-0.17.0.tar.gz
$ tar -xf pig-0.17.0.tar.gz -C /opt/bigdata/hadoop/server/
$ cd /opt/bigdata/hadoop/server/pig-0.17.0
2)配置环境变量
$ vi /etc/profile
export PIG_HOME=/opt/bigdata/hadoop/server/pig-0.17.0
export PATH=$PATH:${PIG_HOME}/bin
export PIG_CONF_DIR=${PIG_HOME}/conf
# 将 PIG_CLASSPATH 环境变量设置为hadoop集群配置目录的位置(包含 core-site.xml、hdfs-site.xml 和 mapred-site.xml 文件的目录):
export PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop
# 如果您使用的是 Tez,您还需要将 Tez 配置目录(包含 tez-site.xml 的目录),我这里不加:
# export PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop:/tez/conf
# 如果您使用的是 Spark,您还需要指定 SPARK_HOME 并指定 SPARK_JAR,这是您上传 $SPARK_HOME/lib/spark-assembly*.jar 的 hdfs 位置:
# export SPARK_HOME=/mysparkhome/; export SPARK_JAR=hdfs://example.com:8020/spark-assembly*.jar
加载生效
$ source /etc/profile
$ pig -version
3)修改配置
$ cd $PIG_CONF_DIR
# 在Pig的 conf 文件夹中,我们有一个名为 pig.properties 的文件。在pig.properties文件中,可以设置如下所示的各种参数。
# 查看配置帮助
$ pig -h properties
四、Apache Pig执行模式
1)本地模式
2)Tez 本地模式
3)Spark 本地模式
上面三个模式是本地模式,下面三个模式是基于Hadoop集群,Hadoop环境部署可以参考我之前的文章:大数据Hadoop原理介绍+安装+实战操作(HDFS+YARN+MapReduce)
4)MapReduce模式(默认模式)
5)Tez 模式
6)Spark 模式
五、Apache Pig执行机制
1)交互模式 (Grunt shell)
调用Grunt Shell
您可以使用 -x 选项以所需的模式(local / MapReduce)调用Grunt shell,如下所示。
【local模式示例】示例一
# 执行完的文件会被删掉,也就是/tmp/passwd执行完会删除
$ cp /etc/passwd /tmp/passwd
$ pig -x local
# 分隔字符串
A = load 'passwd' using PigStorage(':');
# 遍历
B = foreach A generate $0,$2,$6 as id;
# 输出屏幕
dump B;
# 输出到本地文件
store B into 'passwd.out';
【local模式示例】示例二
先准备好数据
$ vi data.txt
001,stu01,18,55
002,stu02,20,50
003,stu03,25,60
运行pig
$ pig -x local
# Tuple(元祖)数据格式
student = LOAD './data.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,height:int);
【MapReduce模式示例】
$ pig
ls
fs -ls
fs -touchz test001
fs -mkdir test
help
2)批处理模式 (脚本)
pig脚本
$ cp /etc/passwd /tmp/passwd
$ vi test01.pig
### pig脚本注释有以下两种:
# 对于多行注释,请使用 /* ...。*/
# 对于单行注释,使用 --
A = load '/tmp/passwd' using PigStorage(':'); -- load the passwd file
B = foreach A generate $0,$2,$6 as p; -- extract the user IDs
store B into 'test01.out'; -- write the results to a file name id.out
执行
# 以各种模式执行
$ pig -x local test01.pig
$ pig -x tez_local test01.pig
$ pig -x spark_local test01.pig
$ pig -x spark test01.pig
$ pig -x tez test01.pig
# MapReduce模式,这里演示一下这个模式,这个模式会提交yarn mr任务
# 先启动hadoop和historyserver服务
$ start-all.sh
$ mr-jobhistory-daemon.sh start historyserver
# 使用这个模式,文件就是HDFS上的
$ hadoop fs -put /tmp/passwd /tmp/
$ pig test01.pig
$ pig -x mapreduce test01.pig
yarn任务
HDFS查看输出结果
3)嵌入式模式(UDF)
六、Pig Latin基础知识
1)数据模型
- Atom(原子)
- Tuple(元组)
- Bag(包)
- Map(映射)
- Relation(关系)
2)数据类型
空值
- 所有上述数据类型的值都可以为NULL。Apache Pig以与SQL相似的方式处理空值。
- null可以是未知值或不存在的值。它用作可选值的占位符。这些空值可以自然发生,也可以是操作的结果。
3)算术操作符
下表描述了Pig Latin的算术运算符。假设a = 10和b = 20。
4)比较运算符
5)类型结构运算符
6)关系操作
关于更多Pig Latin基础知识,可参考官方文档:https://pig.apache.org/docs/r0.17.0/start.html
七、简单使用
1)从文件系统(HDFS / Local)将数据加载到Apache Pig中
语法:
-
relation_name——存储数据的关系变量。
-
输入文件路径(Input file path)——我们必须提到存储文件的HDFS目录。 (在MapReduce模式下)
-
函数(function )——我们必须从Apache Pig提供的一组加载函数( BinStorage,JsonLoader,PigStorage,TextLoader )中选择一个函数。
-
模式(schema)——我们必须定义数据的模式。 我们可以按照以下方式定义所需的模式,例如:(column1 : data type, column2 : data type, column3 : data type);
2)Pig存储数据
语法:
【示例】
$ pig
# 不指定绝对路径就存储当前用户目录下
STORE student INTO 'hdfs://hadoop-node1:8082/pig_Output/ ' USING PigStorage (',');
3)将结果显示打印在屏幕
$ vi data.txt
001,stu01,18,55
002,stu02,20,50
003,stu03,25,60
$ hadoop fs -put data.txt /tmp/
$ student = LOAD '/tmp/data.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,height:int);
# dump不区分大小写
Dump student
dump student
4)描述操作符
Describe Relation_name
Describe student
5)解释运算符
explain Relation_name;
explain student;
6)图解运算符(表结构)
illustrate Relation_name;
illustrate student;
以下操作是类sql操作,如果小伙伴对传统关系型数据了解的话,很容易理解。
7)分组操作(GROUP)
group_data = GROUP Relation_name BY age;
group_data = GROUP student BY age;
dump group_data;
8)协同组操作
student_details.txt
001,Rajiv,Reddy,21,9848022337,Hyderabad
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi
004,Preethi,Agarwal,21,9848022330,Pune
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar
006,Archana,Mishra,23,9848022335,Chennai
007,Komal,Nayak,24,9848022334,trivendram
008,Bharathi,Nambiayar,24,9848022333,Chennai
employee_details.txt
001,Robin,22,newyork
002,BOB,23,Kolkata
003,Maya,23,Tokyo
004,Sara,25,London
005,David,23,Bhuwaneshwar
006,Maggy,22,Chennai
将文件上传到HDFS
$ hadoop fs -put student_details.txt employee_details.txt /tmp/
pig执行
$ pig
student_details = LOAD '/tmp/student_details.txt' USING PigStorage(',')
as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);
employee_details = LOAD '/tmp/employee_details.txt' USING PigStorage(',')
as (id:int, name:chararray, age:int, city:chararray);
cogroup_data = COGROUP student_details by age, employee_details by age;
Dump cogroup_data;
9)JOIN连接操作
- 自连接
- 内部联接
- 外连接 - 左连接,右连接和完全连接
准备好数据
customers.txt
1,Ramesh,32,Ahmedabad,2000.00
2,Khilan,25,Delhi,1500.00
3,kaushik,23,Kota,2000.00
4,Chaitali,25,Mumbai,6500.00
5,Hardik,27,Bhopal,8500.00
6,Komal,22,MP,4500.00
7,Muffy,24,Indore,10000.00
orders.txt
102,2009-10-08 00:00:00,3,3000
100,2009-10-08 00:00:00,3,1500
101,2009-11-20 00:00:00,2,1560
103,2008-05-20 00:00:00,4,2060
1、自连接
$ pig -x local
customers1 = LOAD './customers.txt' USING PigStorage(',')
as (id:int, name:chararray, age:int, address:chararray, salary:int);
customers2 = LOAD './customers.txt' USING PigStorage(',')
as (id:int, name:chararray, age:int, address:chararray, salary:int);
result = JOIN customers1 BY id, customers2 BY id;
dump result ;
发现就是把两个表连接在一起显示了。
2、内部联接
# 使用本地模式
$ pig -x local
customers = LOAD './customers.txt' USING PigStorage(',')
as (id:int, name:chararray, age:int, address:chararray, salary:int);
orders = LOAD './orders.txt' USING PigStorage(',')
as (oid:int, date:chararray, customer_id:int, amount:int);
# 通过customer id进行关联
result = JOIN customers BY id, orders BY customer_id;
dump result;
3、左外连接
# 使用本地模式
$ pig -x local
customers = LOAD './customers.txt' USING PigStorage(',')
as (id:int, name:chararray, age:int, address:chararray, salary:int);
orders = LOAD './orders.txt' USING PigStorage(',')
as (oid:int, date:chararray, customer_id:int, amount:int);
# 通过customer id进行关联
result = JOIN customers BY id LEFT OUTER, orders BY customer_id;
dump result;
4、右外连接
5、全外联接
result = JOIN customers BY id FULL OUTER, orders BY customer_id;
dump result
6、多个键分组操作
employee.txt
001,Rajiv,Reddy,21,programmer,003
002,siddarth,Battacharya,22,programmer,003
003,Rajesh,Khanna,22,programmer,003
004,Preethi,Agarwal,21,programmer,003
005,Trupthi,Mohanthy,23,programmer,003
006,Archana,Mishra,23,programmer,003
007,Komal,Nayak,24,teamlead,002
008,Bharathi,Nambiayar,24,manager,001
employee_contact.txt
001,9848022337,Rajiv@gmail.com,Hyderabad,003
002,9848022338,siddarth@gmail.com,Kolkata,003
003,9848022339,Rajesh@gmail.com,Delhi,003
004,9848022330,Preethi@gmail.com,Pune,003
005,9848022336,Trupthi@gmail.com,Bhuwaneshwar,003
006,9848022335,Archana@gmail.com,Chennai,003
007,9848022334,Komal@gmail.com,trivendram,002
008,9848022333,Bharathi@gmail.com,Chennai,001
pig执行
$ pig -x local
employee = LOAD './employee.txt' USING PigStorage(',')
as (id:int, firstname:chararray, lastname:chararray, age:int, designation:chararray, jobid:int);
employee_contact = LOAD './employee_contact.txt' USING PigStorage(',')
as (id:int, phone:chararray, email:chararray, city:chararray, jobid:int);
result = JOIN employee BY (id,jobid), employee_contact BY (id,jobid);
10)交叉操作
先准备好数据
customers.txt
1,Ramesh,32,Ahmedabad,2000.00
2,Khilan,25,Delhi,1500.00
3,kaushik,23,Kota,2000.00
4,Chaitali,25,Mumbai,6500.00
5,Hardik,27,Bhopal,8500.00
6,Komal,22,MP,4500.00
7,Muffy,24,Indore,10000.00
orders.txt
102,2009-10-08 00:00:00,3,3000
100,2009-10-08 00:00:00,3,1500
101,2009-11-20 00:00:00,2,1560
103,2008-05-20 00:00:00,4,2060
pig执行
# 使用local模式,但是最好使用MapReduce模式,但是这里为了方便演示,就选择local模式了
$ pig -x local
customers = LOAD './customers.txt' USING PigStorage(',')
as (id:int, name:chararray, age:int, address:chararray, salary:int);
orders = LOAD './orders.txt' USING PigStorage(',')
as (oid:int, date:chararray, customer_id:int, amount:int);
result = CROSS customers, orders;
11)联合操作
先准备好数据
student_data1.txt
001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata
003,Rajesh,Khanna,9848022339,Delhi
004,Preethi,Agarwal,9848022330,Pune
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai
student_data2.txt
7,Komal,Nayak,9848022334,trivendram
8,Bharathi,Nambiayar,9848022333,Chennai
pig执行
# 使用local模式,但是最好使用MapReduce模式,但是这里为了方便演示,就选择local模式了
$ pig -x local
student1 = LOAD './student_data1.txt' USING PigStorage(',')
as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);
student2 = LOAD './student_data2.txt' USING PigStorage(',') as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);
result = UNION student1, student2;
dump restult
其实像JOIN操作、分组操作,交叉操作、联合操作等跟传统数据库操作是一样的,但是本质是不一样,但是可以用传统的数据库的思想去理解语句,如有不清楚的小伙伴,可以给我留言。
12)split
以示例驱动理解,先准备好数据
student_details.txt
001,Rajiv,Reddy,21,9848022337,Hyderabad
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi
004,Preethi,Agarwal,21,9848022330,Pune
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar
006,Archana,Mishra,23,9848022335,Chennai
007,Komal,Nayak,24,9848022334,trivendram
008,Bharathi,Nambiayar,24,9848022333,Chennai
pig执行
$ pig -x local
student_details = LOAD './student_details.txt' USING PigStorage(',')
as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);
# 现在让我们将关系分为两部分,一部分列出年龄小于23岁的员工,另一部分列出年龄在22至25岁之间的员工。
SPLIT student_details into student_details1 if age<23, student_details2 if (22<age and age>25);
# 打印输出
Dump student_details1;
Dump student_details2;
13)条件过滤操作
连接【12)split】的操作
filter_data = FILTER student_details BY city == 'Chennai';
Dump filter_data;
14)去重操作
连接【12)split】的操作
distinct_data = DISTINCT student_details;
Dump distinct_data;
15)Foreach遍历操作
连接【12)split】的操作
foreach_data = FOREACH student_details GENERATE id,age,city;
Dump foreach_data;
16)Order排序操作
连接【12)split】的操作
order_by_data = ORDER student_details BY age DESC;
Dump order_by_data;
17)Limit限制操作
连接【12)split】的操作
limit_data = LIMIT student_details 4;
Dump limit_data;
18)函数操作
1、评估函数
2、加载和存储函数
3、Bag和Tuple函数
4、字符串函数
5、日期时间函数
6、数学函数
7、用户定义函数
在使用Java编写UDF时,我们可以创建和使用以下三种类型的函数:
-
过滤器功能 - 过滤器功能用作过滤器语句中的条件。 这些函数接受一个Pig值作为输入并返回一个布尔值。
-
评估函数 - Eval函数用于FOREACH-GENERATE语句。 这些函数接受Pig值作为输入并返回Pig结果。
-
代数函数 - 代数函数在FOREACHGENERATE语句中作用于内袋。 这些功能用于在内袋上执行完整的MapReduce操作。
使用Java编写UDF
1、在pom.xml文件添加配置依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.17.0</version>
</dependency>
代码如下:
/**
* 自定义行数
* 在编写UDF时,必须继承EvalFunc类并为exec() 函数提供实现。在这个函数中,写入UDF所需的代码。
*
*/
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class UDFTest001 extends EvalFunc<String>{
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return null;
String str = (String)input.get(0);
return str.toUpperCase();
}
}
在没有错误编译类之后,右键单击UDFTest001.java文件。它给你一个菜单。IDEA导出jar包
使用UDF,在编写UDF并生成Jar文件之后,请按照以下步骤进行操作:
第1步:注册Jar文件
$ pig -x local
REGISTER ./pig.jar
第2步:定义别名
【示例】
DEFINE UDFTest001 UDFTest001()
第3步:使用UDF
准备数据
udfdata.txt
001,Robin,22,newyork
002,BOB,23,Kolkata
003,Maya,23,Tokyo
004,Sara,25,London
005,David,23,Bhuwaneshwar
006,Maggy,22,Chennai
007,Robert,22,newyork
008,Syam,23,Kolkata
009,Mary,25,Tokyo
010,Saran,25,London
011,Stacy,25,Bhuwaneshwar
012,Kelly,22,Chennai
pig执行
$ pig -x local
udfdata = LOAD './udfdata.txt' USING PigStorage(',') as (id:int, name:chararray, age:int, city:chararray);
Upper_case = FOREACH udfdata GENERATE UDFTest001(name);
dump Upper_case
全部把名字转换成大写了
19)执行脚本
# 交互式执行脚本
$ pig -x mapreduce
exec xxx.pig
# 非交互式执行脚本
$ pig -x mapreduce xxx.pig
八、Apache Pig与其它组件对比
1)Apache Pig与MapReduce对比
2)Apache Pig与SQL对比
除了上述差异之外,Apache Pig Latin 还有以下几个优势:
- 允许在管道中分割。
- 允许开发人员在管道中的任何地方存储数据。
- 声明执行计划。
- 提供操作员执行ETL(提取,转换和加载)功能。
3)Apache Pig与Hive对比
Apache Pig到这里就结束了,操作起来还是比较简单,有疑问的小伙伴,欢迎给我留言,小伙伴也可以参考官方文档。后续会有更多关于大数据文章,请耐心等待。