1 库引擎
1.1 CREATE DATABASE
用于创建指定名称的数据库,语法如下:
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
目前的数据库引擎:
Ø MySQL
Ø MaterializeMySQL
Ø Lazy
Ø Atomic
Ø PostgreSQL
Ø MaterializedPostgreSQL
Ø Replicated
Ø SQLite
1.2 Atomic
支持非阻塞的DROP TABLE和RENAME TABLE查询和原子的EXCHANGE TABLES t1 AND t2查询。默认情况下使用Atomic数据库引擎。建表语句:
CREATEDATABASE
test
[ENGINE= Atomic
];
2 表引擎
2.1 MergeTree表引擎
MergeTree在写入一批数据时,数据总会以数据片段的形式写入磁盘,且数据片段不可修改。为了避免片段过多,ClickHouse会通过后台线程,定期合并这些数据片段,属于相同分区的数据片段会被合成一个新的片段。这种数据片段往复合并的特点,也正是合并树名称的由来
MergeTree作为家族系列最基础的表引擎,主要有以下特点:
Ø 存储的数据按照主键排序:允许创建稀疏索引,从而加快数据查询速度
Ø 支持分区,可以通过PRIMARY KEY语句指定分区字段。
Ø 支持数据副本
Ø 支持数据采样
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
) ENGINE = MergeTree()
ORDER BY expr
[PARTITION BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'], ...]
[SETTINGS name=value, ...]
Ø ENGINE:ENGINE = MergeTree(),MergeTree引擎没有参数
Ø ORDER BY:排序字段。
Ø PARTITION BY:分区字段,可选。
Ø PRIMARY KEY:指定主键,默认主键是排序字段,做为一级索引。可选。
Ø SAMPLE BY:采样字段。可选。
Ø TTL:数据的存活时间。在MergeTree中,可以为某个列字段或整张表设置TTL。当时间到达时,如果是列字段级别的TTL,则会删除这一列的数据;如果是表级别的TTL,则会删除整张表的数据。可选。
Ø SETTINGS:额外的参数配置。可选。
2.2 ReplacingMergeTree表引擎
MergeTree表引擎无法对相同主键的数据进行去重,ClickHouse提供了ReplacingMergeTree引擎,可以针对相同主键的数据进行去重,它能够在合并分区时删除重复的数据。
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = ReplacingMergeTree([ver])
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
例子
CREATE TABLE emp_replacingmergetree (
emp_id UInt16 COMMENT '员工id',
name String COMMENT '员工姓名',
work_place String COMMENT '工作地点',
age UInt8 COMMENT '员工年龄',
depart String COMMENT '部门',
salary Decimal32(2) COMMENT '工资',
create_time DateTime
)ENGINE=ReplacingMergeTree()
ORDER BY emp_id
PRIMARY KEY emp_id
PARTITION BY toYYYYMMDD(create_time)
TTL create_time + INTERVAL 365 DAY;
-- 插入数据
INSERT INTO emp_replacingmergetree
VALUES (1,'tom','上海',25,'技术部',20000),(2,'jack','上海',26,'人事部',10000,now());
INSERT INTO emp_replacingmergetree
VALUES (3,'bob','北京',33,'财务部',50000),(4,'tony','杭州',28,'销售事部',50000,now());
-- 手动执行合并操作
optimize table emp_replacingmergetree final;
Ø [ver]:可选参数,列的版本,可以是UInt、Date或者DateTime类型的字段作为版本号。该参数决定了数据去重的方式。
Ø 当没有指定[ver]参数时,保留最新的数据;如果指定了具体的值,保留最大的版本数据
2.3 Log系列表引擎
Log系列表引擎功能相对简单,主要用于快速写入小表(1百万行左右的表),然后全部读出的场景。即一次写入多次查询。
2.4 外部集成表引擎
ClickHouse提供了许多与外部系统集成的方法,包括一些表引擎。这些表引擎与其他类型的表引擎类似,可以用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。
例如直接读取HDFS的文件或者MySQL数据库的表。这些表引擎只负责元数据管理和数据查询,而它们自身通常并不负责数据的写入,数据文件直接由外部系统提供。目前ClickHouse提供了下面的外部集成表引擎:
Ø ODBC:通过指定odbc连接读取数据源
Ø JDBC:通过指定jdbc连接读取数据源;
Ø MySQL:将MySQL作为数据存储,直接查询其数据
Ø HDFS:直接读取HDFS上的特定格式的数据文件;
Ø Kafka:将Kafka数据导入ClickHouse
Ø RabbitMQ:与Kafka类似
-- 创建Mysql引擎表
CREATE TABLE mysql_table1 (
id UInt64,
column1 String
) ENGINE = MySQL('127.0.0.1','数据库名','表名','用户名','密码');
-- 创建Kafka引擎表
CREATE TABLE kafka_table_consumer
(
`id` UInt16,
`name` String
) ENGINE = Kafka() SETTINGS
kafka_broker_list = '120.26.126.158:9092',
kafka_topic_list = 'student',
kafka_group_name = 'group2',
kafka_format = 'JSONEachRow';
2.5 Memory表引擎
Memory表引擎直接将数据保存在内存中,数据既不会被压缩也不会被格式转换。当ClickHouse服务重启的时候,Memory表内的数据会全部丢失。
3 集群
3.1 副本(高可用)
副本的目的主要是保障数据的高可用性,即使一台clickhouse节点宕机,那么也可以从其他服务器获得相同的数据。
3.1.1 副本写入流程
3.1.2 副本配置
打开 ClickHouse 的配置文件/etc/clickhouse-server/config.xml,指定 Zookeeper 集群地址,打开 ClickHouse 的配置文件/etc/clickhouse-server/config.xml,指定 Zookeeper 集群地址。
<zookeeper>
<node index="1">
<host>192.168.68.121</host>
<port>2181</port>
</node>
<node index="2">
<host>192.168.68.122</host>
<port>2181</port>
</node>
<node index="3">
<host>192.168.68.123</host>
<port>2181</port>
</node>
</zookeeper>
3.1.3 副本表
副本只能同步数据,不能同步表结构,所以我们需要在每台机器上自己手动建表。这里演示副本只在 hadoop121 和 hadoop122 两台服务器上操作。
-- hadoop121
create table t_order_rep
(
id UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
) engine =ReplicatedMergeTree('/clickhouse/table/01/t_order_rep','rep_121')
partition by toYYYYMMDD(create_time)
primary key (id)
order by (id,sku_id);
-- hadoop122
create table t_order_rep
(
id UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
) engine =ReplicatedMergeTree('/clickhouse/table/01/t_order_rep','rep_122')
partition by toYYYYMMDD(create_time)
primary key (id)
order by (id,sku_id);
参数解释: ReplicatedMergeTree
中,第一个参数是分片的zk_path 一般按照/clickhouse/table/{shard}/{table_name}
的格式写,如果只有一个分片就写 01 即可;第二个参数是副本名称,相同的分片副本名称不能相同。
在 hadoop121 上执行 insert 语句,在 hadoop121和hadoop122 上执行 select,都可以查询出结果。
3.2 分片集群(3台)
副本虽然能够提高数据的可用性,降低丢失风险,但是对数据的横向扩容(多台机器共同分担一份数据)没有解决。每台机子实际上必须容纳全量数据,对集群的性能并没有提升。
要解决数据水平切分的问题,需要引入分片的概念。通过分片把一份完整的数据进行切分,不同的分片分布到不同的节点上。在通过Distributed表引擎把数据拼接起来一同使用。
3.2.1 metrika.xml的配置
sudo vim /etc/clickhouse-server/config.d/metrika.xml
# 在metrika.xml中我们配置3个分片,每个分片有1个副本,配置如下:
<yandex>
<clickhouse_remote_servers>
<clickhouse_cluster_3shards_1replicas> <!-- 集群名称-->
<shard> <!--集群的第一个分片-->
<internal_replication>true</internal_replication>
<replica> <!--该分片的第一个副本-->
<host>clickhouse-server01</host>
<port>9000</port>
</replica>
</shard>
<shard> <!--集群的第二个分片-->
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse-server02</host>
<port>9000</port>
</replica>
</shard>
<shard> <!--集群的第三个分片-->
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse-server03</host>
<port>9000</port>
</replica>
</shard>
</clickhouse_cluster_3shards_1replicas>
</clickhouse_remote_servers>
<zookeeper>
< node index="1">
<host> clickhouse-server01</host>
<port>2181</port>
</node>
<node index="2">
<host> clickhouse-server02</host>
<port>2181</port>
</node>
<node index="3">
<host> clickhouse-server03</host>
<port>2181</port>
</node>
</zookeeper>
<macros>
<shard>01</shard> <!--不同机器放的分片数不一样-->
<replica>node1</replica> <!--不同机器放的副本数不一样-->
</macros>
<networks>
<ip>0.0.0.0/0</ip>
</networks>
<clickhouse_compression>
<case>
<min_part_size>10000000000</min_part_size>
<min_part_size_ratio>0.01</min_part_size_ratio>
<method>lz4</method>
</case>
</clickhouse_compression>
</yandex>
注意:需要在每台clickhouse节点上配置metrika.xml文件,并且修改每个节点的 macros配置名称。
<!-- node2节点修改metrika.xml中的宏变量如下:-->
<macros>
<shard>02</shard>
<replica>node2</replica>
</macros>
<!-- node3节点修改metrika.xml中的宏变量如下: -->
<macros>
<shard>03</shard>
<replica>node3</replica>
</macros>
3.2.2 修改config.xml
三台服务器需要在config.xml ,中找到remote_servers 标签, 补充:
<remote_servers incl="clickhouse_remote_servers"> <!--将配置引用过来-->
在config.xml中加入
<zookeeper incl="zookeeper-servers" optional="true" />
<include_from>/etc/clickhouse-server/config.d/metrika.xml</include_from>
<macros incl="macros" optional="true" />
3.2.3 重启服务
三台机器重启服务
sudo systemctl restart clickhouse-server
ps -ef|grep clickhouse
然后执行以下语句
select * from system.clusters;
3.2.4 分布表
Distributed表引擎本身不存储数据,有点类似于MyCat之于MySql,成为一种中间件,通过分布式逻辑表来写入、分发、路由来操作多台节点不同分片的分布式数据。
创建集群表,在集群中任一节点执行以下语句:
create table
st_order_mt_1214 on cluster clickhouse_cluster_3shards_1replicas (
uid UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time
Datetime
) engine
=ReplicatedMergeTree('/clickhouse/tables/{shard}/st_order_mt_1214','{replica}')
partition by toYYYYMMDD(create_time)
primary key (uid)
order by (uid ,sku_id );
完成之后会出现提示信息,并在其他节点也可以查询到该表。
创建分布表
create table
st_order_mt_1214_all on cluster clickhouse_cluster_3shards_1replicas
(
uid UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time
Datetime
)engine = Distributed(集群名称,库名,
st_order_mt_1214,hiveHash(sku_id));
其中参数:
Ø Distributed( 集群名称,库名,本地表名,分片键)
Ø 分片键必须是整型数字 ,也可以rand()