0
点赞
收藏
分享

微信扫一扫

Mac M1 VM Centos7 大数据数据湖测试

Mac M1 通过VMan安装Centos7.9,并搭建 Hadoop/Hive/Kafka/Flink/Iceberg 本地进行数据湖测试。

问题:

Paralles Desktop 没找到免费的,所以用了VM,VM也可以网上找。

Centos7.9官方版本在VM中不成功,所以使用了别人编译的版本:​​在m1芯片的MacBook上安装centos7​​

JDK使用 yum 安装 arm64架构的1.8.322版本。

MySQL使用官网下载arm64版本。

大数据相关组件使用官网二进制包。

集群信息

主机名

内网IP

datalake-01

10.0.0.10

datalake-02

10.0.0.11

datalake-03

10.0.0.12

配置信息

CUP

内存

OS

4

8GB

Centos 7.9 aarch64

组件版本

组件

版本

Java

1.8.332.aarch64

Scala

2.12.15

Hadoop

3.2.3

Zookeeper

3.5.9

Hive

3.1.3

kafka

3.1.1

Flink

1.14.4

Iceberg

0.13.1

MySQL

8.0.15.aarch64

组件信息

组件

服务

Zookeeper

3节点

Hadoop HA

NameNode

01,02节点

DataNode

3节点

YARN

ResourceManager

01,02节点

NodeManager

3节点

Hive

Metastore

01,02节点

Hiveserver2

01,02节点

Kafka

Broker

3节点

Flink

JobManager

01,02节点

TaskManager

3节点

MySQL

01节点

VM虚拟机安装Centos7

Tabby远程连接虚拟服务器

Navicat连接MySQL

环境变量

Centos7 和 Mac 都配置在这里,Mac 配置 JDK 和 Maven。

~/.bash_perofile

# java
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.332.b09-1.el7_9.aarch64/jre
# scala
export SCALA_HOME=/opt/apps/scala-2.12.15
# zookeeper
export ZK_HOME=/opt/apps/zookeeper-3.5.9
# hadoop
export HADOOP_HOME=/opt/apps/hadoop-3.2.3
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`
# hive
export HIVE_HOME=/opt/apps/hive-3.1.3
export HIVE_CONF_DIR=$HIVE_HOME/conf
# hadoop
export KAFKA_HOME=/opt/apps/kafka-3.1.1
# maven
export M2_HOME=/opt/apps/maven-3.6.3
# flink
export FLINK_HOME=/opt/apps/flink-1.14.4

export PATH=$PATH:$JAVA_HOME/bin:$SCALA_HOME/bin:$ZK_HOME/bin:$HIVE_HOME/bin:$KAFKA_HOME/bin:$M2_HOME/bin:$FLINK_HOME/bin

sql-client 测试

Flink相关配置

仅对测试环境使用

xecution.checkpointing.interval: 10s
execution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION #[DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
state.checkpoints.num-retained: 20
execution.checkpointing.mode: EXACTLY_ONCE #[EXACTLY_ONCE, AT_LEAST_ONCE]
execution.checkpointing.timeout: 10min
execution.checkpointing.tolerable-failed-checkpoints: 3
execution.checkpointing.unaligned: false

启动Flink Standalone集群

start-cluster.sh

# 停止
stop-cluster.sh

Web

​​http://datalake-01:8081/​​

SQL文件

sql-client-conf.sql

create catalog hive_catalog with (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://datalake-01:9083',
'clients'='5',
'property-version'='2',
'warehouse'='/user/hive/warehouse/'
);

create catalog hadoop_catalog with (
'type' = 'iceberg',
'catalog-type' = 'hadoop',
'property-version' = '2',
'warehouse' = '/user/hive/warehouse/'
);

启动 sql-client

sql-client.sh -i ../sql-client-conf.sql

show catalogs;

+-----------------+
| catalog name |
+-----------------+
| default_catalog |
| hadoop_catalog |
| hive_catalog |
+-----------------+

测试

# cdc 0.13已支持cdc写入,但是不支持cdc流读,即只支持流读append数据,不支持流读update数据
drop table if exists default_catalog.default_database.cdc_source_table;
create table if not exists default_catalog.default_database.cdc_source_table (
id int,
data string,
dt string,
primary key (id) not enforced
) with (
'connector' = 'mysql-cdc',
'hostname' = '10.0.0.10',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'test_tb',
'server-time-zone' = 'Asia/Shanghai'
);

set execution.type = streaming;
set execution.result-mode = tableau;

# streaming 查看 cdc 表
select * from default_catalog.default_database.cdc_source_table;

# hive catalog
create database hive_catalog.iceberg_db;

drop table hive_catalog.iceberg_db.iceberg_table;
create table if not exists hive_catalog.iceberg_db.iceberg_table(
id bigint comment 'unique id',
data string,
dt string,
primary key (id) not enforced
) comment 'iceberg test table'
partitioned by (dt)
with(
'format-version' = '2',
'write.distribution-mode'='hash',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='10'
);

# cdc to iceberg
insert into hive_catalog.iceberg_db.iceberg_table select * from default_catalog.default_database.cdc_source_table;

# batch 模式查看 iceberg
select * from hadoop_catalog.iceberg_db.iceberg_table;

# streaming 模式查看 iceberg
set table.dynamic-table-options.enabled = true;

# 有写入新数据可以流读,但是update/delete数据暂时不支持流读,会报错
select * from hive_catalog.iceberg_db.iceberg_table /*+ options('streaming'='true')*/;

insert into hive_catalog.iceberg_db.iceberg_table values(4, 'e', '2022-05-22');

CDC 到 Upsert-kafka

# cdc
create table if not exists cdc_source_table (
id int,
data string,
dt string,
primary key (id) not enforced
) with (
'connector' = 'mysql-cdc',
'hostname' = '10.0.0.10',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'test_tb',
'server-time-zone' = 'Asia/Shanghai'
);

# upsert-kafka
create table upsert_kafka_table (
id int,
data string,
dt string,
primary key (id) not enforced
) with (
'connector' = 'upsert-kafka',
'topic' = 'test2',
'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
'properties.group.id' = 'testGroup',
'key.format' = 'json',
'value.format' = 'json'
);

# cdc to upsert-kafka,修改MySQL中数据可以实时看到主键数据的变更
insert into upsert_kafka_table select * from cdc_source_table;

Kafka 到 Upsert-Kafka

# kafka to upsert-kafka
# kafka source 没有主键
create table kafka_source_table (
id int,
data string,
dt string
) with (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);

# 复用上面 upsert-kafka 表
insert into upsert_kafka_table select * from kafka_source_table;

# 查看 upsert-kafka 表结果
select * from upsert_kafka_table;

# 向 Kafka 写入数据,可以看到数据的变更
kafka-console-producer.sh --broker-list datalake-01:9092 --topic test

{"id":1, "data":"a", "dt":"2022-05-22"}
{"id":2, "data":"b", "dt":"2022-05-22"}
{"id":3, "data":"c", "dt":"2022-05-22"}
{"id":4, "data":"d", "dt":"2022-05-22"}
{"id":5, "data":"e", "dt":"2022-05-22"}

{"id":1, "data":"aa", "dt":"2022-05-22"}

{"id":1, "data":"DD", "dt":"2022-05-22"}

+----+-------------+--------------------------------+--------------------------------+
| op | id | data | dt |
+----+-------------+--------------------------------+--------------------------------+
| +I | 1 | a | 2022-05-22 |
| +I | 2 | b | 2022-05-22 |
| -U | 1 | a | 2022-05-22 |
| +U | 1 | aa | 2022-05-22 |
| -U | 1 | aa | 2022-05-22 |
| +U | 1 | DD | 2022-05-22 |

问题

1、Flink 整合 Hadoop3 需要依赖包

flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar
antlr-runtime-3.5.2.jar
commons-cli-1.5.0.jar

2、Flink 导入相关依赖包

flink-sql-connector-hive-3.1.2_2.12-1.14.4.jar
flink-sql-connector-kafka_2.12-1.14.4.jar
flink-sql-connector-mysql-cdc-2.2.1.jar
flink-statebackend-rocksdb_2.12-1.14.4.jar
iceberg-flink-runtime-1.14-0.13.1.jar

3、Hadoop3.2.3 的 guava 比 Hive 的版本高,拷贝到 Hive 的 lib 下

cp $HADOOP_HOME/share/hadoop/common/lib/guava-27.0-jre.jar $HIVE_HOME/lib/
mv $HIVE_HOME/lib/guava-19.0.jar $HIVE_HOME/lib/guava-19.0.jar.bak

4、HADOOP_CLASSPATH问题

在导入HADOOP_HOME,并加入PATH后再导入

export HADOOP_CLASSPAT=`hadoop classpath`

本地测试

类型

版本

IDEA

2022.1.0

Maven

3.6.3

Java

1.8.332

Scala

2.12.15

Flink

1.14.4

拷贝配置文件

将 $HADOOP_HOME/etc/hadoop,$HIVE_HOME/conf 目录下 core-site.xml、hdfs-site.xml、hive-site.xml 三个文件拷贝到项目 resources 下

log4j 配置文件

在 resources 目录下创建配置文件 log4j.properties

log4j.rootLogger=info, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p - %m%n
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-10c %x - %m%n

测试代码

package com.jt.test

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory

object FlinkCdcTest {
private var logger: org.slf4j.Logger = _

def main(args: Array[String]): Unit = {
logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
Logger.getLogger("org.apache").setLevel(Level.INFO)
Logger.getLogger("hive.metastore").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()

val tableEnv = TableEnvironment.create(settings)

// hive catalog
val catalogDDL =
"""
|create catalog hive_catalog with (
| 'type' = 'iceberg',
| 'catalog-type' = 'hive',
| 'uri' = 'thrift://datalake-01:9083',
| 'clients' = '5',
| 'property-version' = '1',
| 'warehouse' = 'hdfs://nameservice1/user/hive/warehouse/'
|)
|""".stripMargin
tableEnv.executeSql(catalogDDL)

// iceberg sink table
val databaseDDL = "create database if not exists iceberg_db"
tableEnv.executeSql(databaseDDL)

// val tableDDL =
// """
// |drop table hive_catalog.iceberg_db.iceberg_table;
// |create table if not exists hive_catalog.iceberg_db.iceberg_table(
// | id bigint comment 'unique id',
// | data string,
// | dt string,
// | primary key (id) not enforced
// |) comment 'iceberg test table'
// | partitioned by (dt)
// | with(
// | 'format-version' = '2',
// | 'write.distribution-mode'='hash',
// | 'write.metadata.delete-after-commit.enabled'='true',
// | 'write.metadata.previous-versions-max'='10'
// |);
// """.stripMargin
// tableEnv.executeSql(tableDDL)
//
//
// // cdc source
// val cdcSourceDDL1 = "drop table if exists default_catalog.default_database.cdc_source_table"
// tableEnv.executeSql(cdcSourceDDL1)
//
// val cdcSourceDDL =
// """
// |create table if not exists default_catalog.default_database.cdc_source_table (
// | id int,
// | data string,
// | dt string,
// | primary key (id) not enforced
// |) with (
// | 'connector' = 'mysql-cdc',
// | 'hostname' = '10.0.0.10',
// | 'port' = '3306',
// | 'username' = 'root',
// | 'password' = '123456',
// | 'database-name' = 'test_db',
// | 'table-name' = 'test_tb',
// | 'server-time-zone' = 'Asia/Shanghai'
// |)
// """.stripMargin
// tableEnv.executeSql(cdcSourceDDL)
//
// // cdc to iceberg
// val cdcToIcebergDML =
// """
// |insert into hive_catalog.iceberg_db.iceberg_table
// |select * from default_catalog.default_database.cdc_source_table
// |""".stripMargin
// tableEnv.executeSql(cdcToIcebergDML)

// batch read iceberg
// val showIceberg = "select * from hive_catalog.iceberg_db.iceberg_table"
// tableEnv.executeSql(showIceberg).print()


// kafka source
// val kafkaSourceDDL =
// """
// |create table kafka_source_table (
// | id int,
// | data string,
// | dt string
// |) with (
// | 'connector' = 'kafka',
// | 'topic' = 'test',
// | 'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
// | 'properties.group.id' = 'testGroup',
// | 'format' = 'json',
// | 'scan.startup.mode' = 'earliest-offset',
// | 'json.fail-on-missing-field' = 'false',
// | 'json.ignore-parse-errors' = 'true'
// |)
// """.stripMargin
// tableEnv.executeSql(kafkaSourceDDL)

// upsert kafka
val upsertKafkaSourceDDL =
"""
|create table upsert_kafka_table (
| id int,
| data string,
| dt string,
| primary key (id) not enforced
|) with (
| 'connector' = 'upsert-kafka',
| 'topic' = 'test2',
| 'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
| 'properties.group.id' = 'testGroup',
| 'properties.scan.startup.mode' = 'earliest-offset',
| 'key.format' = 'json',
| 'key.json.ignore-parse-errors' = 'true',
| 'key.json.fail-on-missing-field' = 'false',
| 'value.format' = 'json',
| 'value.json.fail-on-missing-field' = 'false'
|)
""".stripMargin
tableEnv.executeSql(upsertKafkaSourceDDL)

tableEnv.executeSql("select * from upsert_kafka_table").print()
}
}

pom 依赖



UTF-8
1.8
1.8

3.2.2
3.8.1
3.1.1

1.8
2.12.15
2.12

3.2.3
1.14.4
2.2.1
0.13.1
1.14
3.1.3


compile





org.apache.flink
flink-runtime-web_${scala.binary.version}
${flink.version}
${scope.type}


org.apache.flink
flink-core
${flink.version}
${scope.type}


org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
${scope.type}


org.apache.flink
flink-table-common
${flink.version}
${scope.type}


org.apache.flink
flink-table-api-scala-bridge_${scala.binary.version}

${flink.version}
${scope.type}


org.apache.flink
flink-streaming-scala_${scala.binary.version}

${flink.version}
${scope.type}


org.apache.flink
flink-table-planner_${scala.binary.version}
${flink.version}
${scope.type}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
${scope.type}


org.apache.flink
flink-csv
${flink.version}
${scope.type}


org.apache.flink
flink-json
${flink.version}
${scope.type}


org.apache.flink
flink-orc_${scala.binary.version}
${flink.version}
${scope.type}


org.apache.flink
flink-statebackend-rocksdb_${scala.binary.version}

${flink.version}
${scope.type}


org.apache.flink
flink-sql-connector-kafka_${scala.binary.version}
${flink.version}
${scope.type}


org.apache.flink
flink-connector-hive_${scala.binary.version}
${flink.version}


com.google.guava
guava


${scope.type}


com.ververica
flink-sql-connector-mysql-cdc
${flink.cdc.version}
${scope.type}


org.apache.iceberg
iceberg-flink-runtime-${iceberg.flink.version}
${iceberg.version}
${scope.type}


org.apache.flink
flink-statebackend-rocksdb_${scala.binary.version}
${flink.version}
${scope.type}


commons-cli
commons-cli
1.5.0


org.apache.hadoop
hadoop-common
${hadoop.version}


org.apache.commons
commons-math3


${scope.type}


org.apache.hadoop
hadoop-hdfs
${hadoop.version}
${scope.type}


org.apache.hadoop
hadoop-client
${hadoop.version}
${scope.type}



org.apache.hive
hive-exec
${hive.version}
${scope.type}


org.apache.logging.log4j
log4j-slf4j-impl


com.google.guava
guava




org.antlr
antlr-runtime
3.5.2





net.alchim31.maven
scala-maven-plugin
${scala.maven.plugin.version}



compile





org.apache.maven.plugins
maven-assembly-plugin
${maven.assembly.plugin.version}


jar-with-dependencies




make-assembly
package

single






举报

相关推荐

0 条评论