Prepare
The hardware configuration
server *3
cpu:4core memory:16G disk: hdd 300g
software configuration
Citus Plugin For Pgsql - Quickstart
test data
stream_616d2986bd17682e66aedbb3 899035
stream_617fc79a355b7f54684a9c8b 2693935
Sql
stream_616d2986bd17682e66aedbb3
default
explain analyse
SELECT
"col0" AS diffcol0,
count('*') AS count
FROM
"stream_616d2986bd17682e66aedbb3" AS tab0,
(
SELECT
tab0._id,
UNNEST("f_7d1_ff35_t_labels$textual") AS "col0"
FROM
"stream_616d2986bd17682e66aedbb3" as tab0
) AS tab1
WHERE
("tab1"."_id" = "tab0"."_id")
GROUP BY
"diffcol0"
ORDER BY
"count" DESC NULLS LAST
LIMIT
50 OFFSET 0;
ngram
explain analyse
SELECT
"col0" AS diffcol0,
count('*') AS count
FROM
"stream_616d2986bd17682e66aedbb3" AS tab0,
(
SELECT
tab0._id,
c_unnest_taxonomy_with_null("f_7d1_ff35_t_labels$textual") AS "col0"
FROM
"stream_616d2986bd17682e66aedbb3" as tab0
) AS tab1
WHERE
(
(
"f_n61_af35_ngrams$textual" && (ARRAY ['日期_新鲜','值得_信赖']::text [])
)
AND "tab1"."_id" = "tab0"."_id"
)
GROUP BY
"diffcol0"
ORDER BY
"diffcol0" ASC NULLS FIRST
LIMIT
200 OFFSET 0;
stream_617fc79a355b7f54684a9c8b
default
explain analyse
SELECT
"col0" AS diffcol0,
count('*') AS count
FROM
"stream_617fc79a355b7f54684a9c8b" AS tab0,
(
SELECT
tab0._id,
UNNEST("f_jq2_20d7_t_labels$textual") AS "col0"
FROM
"stream_617fc79a355b7f54684a9c8b" as tab0
) AS tab1
WHERE
("tab1"."_id" = "tab0"."_id")
GROUP BY
"diffcol0"
ORDER BY
"count" DESC NULLS LAST
LIMIT
50 OFFSET 0;
ngram
explain analyse
SELECT
"col0" AS diffcol0,
count('*') AS count
FROM
"stream_617fc79a355b7f54684a9c8b" AS tab0,
(
SELECT
tab0._id,
c_unnest_taxonomy_with_null("f_jq2_20d7_t_labels$textual") AS "col0"
FROM
"stream_617fc79a355b7f54684a9c8b" as tab0
) AS tab1
WHERE
(
(
"f_fq2_7de8_ngrams$textual" && (ARRAY ['日期_新鲜','值得_信赖']::text [])
)
AND "tab1"."_id" = "tab0"."_id"
)
GROUP BY
"diffcol0"
ORDER BY
"diffcol0" ASC NULLS FIRST
LIMIT
200 OFFSET 0;
testing
Hash mode
citus configure
set citus.shard_count=8;
You can set the number of global shards for the current database.
query speed results
Cluster mode report ( no backup)
set citus.shard_replication_factor=1;
0.9m | cluster | shard4 | shard8 | shard16 | shard32 |
---|---|---|---|---|---|
default | 3940 ms | 1833 ms | 1713 ms | 1558 ms (39.54%) | 1522 ms (38.62%) |
ngram | 5406 ms | 1714 ms | 1152 ms | 893 ms (16.51%) | 974 ms (18.01%) |
2.7m | cluster | shard4 | shard8 | shard16 | shard32 |
---|---|---|---|---|---|
default | 5719 ms | 2763 ms | 3406 ms | 2750 ms (48.08%) | 2881 ms (50.37%) |
ngram | 9395 ms | 2757 ms | 2214 ms | 1843 ms (19.61%) | 1772 ms (18.86%) |
Cluster mode report (backup)
set citus.shard_replication_factor=2;
0.9m | cluster | shard 4 | shard 8 | shard 16 | shard 32 |
---|---|---|---|---|---|
default | 3940 ms | 2020 ms | 1910 ms | 1426 ms (36.19%) | 1327 ms (33.68%) |
ngram | 5406 ms | 1617 ms | 863 ms | 920 ms(17.01%) | 996 ms(18.42%) |
2.7m | cluster | shard 4 | shard 8 | shard 16 | shard 32 |
---|---|---|---|---|---|
default | 5719 ms | 2921 ms | 3126 ms | 2384 ms (41.68%) | 2533 ms (44.29%) |
ngram | 9395 ms | 2769 ms | 2090 ms | 1710 ms (18.20%) | 1794 ms (19.09) |
Cluster mode report(no backup + backup)
0.9m | no backup (default) | backup (default) | no backup(ngram) | backup (ngram) |
---|---|---|---|---|
shard 4 | 1833 ms | 2020 ms | 1671 ms | 1617 ms |
shard 8 | 1713 ms | 1910 ms | 1152 ms | 863 ms |
shard 16 | 1558 ms | 1426 ms | 893 ms | 920 ms |
shard 32 | 1522 ms | 1327 ms | 974 ms | 996 ms |
2.7m | no backup (default) | backup (default) | no backup(ngram) | backup (ngram) |
---|---|---|---|---|
shard 4 | 2763 ms | 2921 ms | 2757 ms | 2769 ms |
shard 8 | 3406 ms | 3126 ms | 2214 ms | 2090 ms |
shard 16 | 2750 ms | 2384 ms | 1843 ms | 1710 ms |
shard 32 | 2881 ms | 2533 ms | 1772 ms | 1794 ms |
Stand-alone mode report
Standalone does not require testing
conclusion
Based on the table above, we can see that 16 shards work best
cpu_core_number * (2-4) * number of nodes
shard backup test
worker1(shard table id) | worker2(shard table id) | worker3 (shard table id) |
---|---|---|
chunk-102087 | chunk-102088 | chunk-102087 |
chunk-102088 | chunk-102089 | chunk-102089 |
chunk-102090 | chunk-102091 | chunk-102090 |
chunk-102091 |
worker1(shard table id) | worker2(shard table id) | worker3 (shard table id) |
---|---|---|
chunk-102092 | chunk-102093 | chunk-102092 |
chunk-102093 | chunk-102094 | chunk-102094 |
chunk-102095 | chunk-102096 | chunk-102095 |
chunk-102096 | chunk-102097 | chunk-102097 |
Citus shardbackup failover test
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=510.14..510.27 rows=50 width=40) (actual time=1043.209..1043.220 rows=50 loops=1)
-> Sort (cost=510.14..510.64 rows=200 width=40) (actual time=1043.207..1043.210 rows=50 loops=1)
Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC NULLS LAST
Sort Method: top-N heapsort Memory: 30kB
-> HashAggregate (cost=500.00..503.50 rows=200 width=40) (actual time=1043.111..1043.158 rows=163 loops=1)
Group Key: remote_scan.diffcol0
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=40) (actual time=1042.721..1042.791 rows=954 loops=1)
Task Count: 6
Tuple data received from nodes: 13 kB
Tasks Shown: One of 6
-> Task
Tuple data received from node: 2171 bytes
Node: host=10.163.16.167 port=5434 dbname=db_61b061f8889c8eb9da8fe0d5
-> Finalize GroupAggregate (cost=101649.93..101700.60 rows=200 width=40) (actual time=1022.072..1022.481 rows=157 loops=1)
.......
Planning Time: 3.273 ms
Execution Time: 1043.295 ms
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------
Limit (cost=510.14..510.27 rows=50 width=40) (actual time=1879.004..1879.014 rows=50 loops=1)
-> Sort (cost=510.14..510.64 rows=200 width=40) (actual time=1879.001..1879.004 rows=50 loops=1)
Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC NULLS LAST
Sort Method: top-N heapsort Memory: 30kB
-> HashAggregate (cost=500.00..503.50 rows=200 width=40) (actual time=1878.908..1878.955 rows=163 loops=1)
Group Key: remote_scan.diffcol0
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=40) (actual time=1878.567..1878.629 rows=954 loops=1)
Task Count: 6
Tuple data received from nodes: 13 kB
Tasks Shown: One of 6
-> Task
Tuple data received from node: 2179 bytes
Node: host=10.163.21.120 port=5434 dbname=db_61b061f8889c8eb9da8fe0d5
-> Finalize GroupAggregate (cost=111112.90..111163.57 rows=200 width=40) (actual time=1803.301..1803.372 rows=158 loops=1)
........
Planning Time: 1.486 ms
Execution Time: 1879.072 ms
(34 rows)
After the worker node is closed manually, a warning will be displayed indicating that the query speed is not guaranteed, but data integrity is guaranteed.
Manually shut down two Citus worker nodes at random, the query is likely to fail, and the query is likely to succeed.
rebalance shards test
db_61b061f8889c8eb9da8fe0d5=# /*NO LOAD BLANCE*/ SELECT rebalance_table_shards('stream_616d2986bd17682e66aedbb3');
ERROR: rebalance_table_shards() is only supported on Citus Enterprise
Disk usage test
0.9m | total | shard 4 | shard8 |
---|---|---|---|
no backup | 2094MB | 524 + 524 + 522 + 524 (2094MB) | 263+262+262+261+262+263+262+262 (2097MB) |
backup | 2094MB | 523+524+523+524+524+524+523+523 (4188MB) | 262+262+263+262+262+262+261+262+262+263+262+262+262+261+263+263 (4194MB) |
conclusion
According to the above table, several fragments can be created and evenly distributed to several worker nodes. Using backup will double the disk usage of worker nodes.
table_total_size / number of nodes * citus.shard_replication_factor
append mode
citus configure
select create_distributed_table('stream_616d2986bd17682e66aedbb3','_id', 'hash')
set citus.shard_max_size="100MB";
Set the size of each shard and create a new shard.
create distributed tables
test_append=# select create_distributed_table('stream_616d2986bd17682e66aedbb3','_id', 'append');
WARNING: table "stream_616d2986bd17682e66aedbb3" has a UNIQUE or EXCLUDE constraint
DETAIL: UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced.
HINT: Consider using hash partitioning.
create_distributed_table
--------------------------
Execute SQL:
ERROR: cannot push down this subquery
DETAIL: Currently append partitioned relations with overlapping shard intervals are not supported
Description The query failed because primary_key was used as the distributed column
test again:
test_append=# select create_distributed_table('stream_616d2986bd17682e66aedbb3','f_vc_dbb3_Date$temporal', 'append');
WARNING: table "stream_616d2986bd17682e66aedbb3" has a UNIQUE or EXCLUDE constraint
描述: UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced.
提示: Consider using hash partitioning.
ERROR: cannot create constraint on "stream_616d2986bd17682e66aedbb3"
描述: Distributed relations cannot have UNIQUE, EXCLUDE, or PRIMARY KEY constraints that do not include the partition column (with an equality operator if EXCLUDE).
Citus with PGSQL one master and two slave
Architecture diagram
The implementation process
-
Install the Citus plug-in in the existing cluster,adding a configuration and restart
-
Add citus worker node to master node.
/*NO LOAD BLANCE*/ SELECT * from master_add_node('worker1_ip', 5432);
-
Create a distributed table
/*NO LOAD BLANCE*/ select create_distributed_table('stream_616d2986bd17682e66aedbb3','_id', 'hash');
-
Validation
SELECT * FROM master_get_active_worker_nodes();
SQL validation
Use citus to query
db_61b061f8889c8eb9da8fe0d5=# explain analyse
SELECT
"col0" AS diffcol0,
count('*') AS count
FROM
"stream_616d2986bd17682e66aedbb3" AS tab0,
(
SELECT
tab0._id,
UNNEST("f_7d1_ff35_t_labels$textual") AS "col0"
FROM
"stream_616d2986bd17682e66aedbb3" as tab0
) AS tab1
WHERE
("tab1"."_id" = "tab0"."_id")
GROUP BY
"diffcol0"
ORDER BY
"count" DESC NULLS LAST
LIMIT
50 OFFSET 0;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=510.14..510.27 rows=50 width=40) (actual time=1038.773..1038.784 rows=50 loops=1)
-> Sort (cost=510.14..510.64 rows=200 width=40) (actual time=1038.772..1038.774 rows=50 loops=1)
Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC NULLS LAST
Sort Method: top-N heapsort Memory: 30kB
-> HashAggregate (cost=500.00..503.50 rows=200 width=40) (actual time=1038.682..1038.728 rows=163 loops=1)
Group Key: remote_scan.diffcol0
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=40) (actual time=1038.332..1038.415 rows=954 loops=1)
Task Count: 6
Tuple data received from nodes: 13 kB
Tasks Shown: One of 6
-> Task
Tuple data received from node: 2179 bytes
Node: host=10.163.16.167 port=5434 dbname=db_61b061f8889c8eb9da8fe0d5
-> Finalize GroupAggregate (cost=102035.79..102086.46 rows=200 width=40) (actual time=1001.646..1001.941 rows=158 loops=1)
Group Key: (unnest(tab0_1."f_7d1_ff35_t_labels$textual"))
-> Gather Merge (cost=102035.79..102082.46 rows=400 width=40) (actual time=1001.600..1005.658 rows=467 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Sort (cost=101035.76..101036.26 rows=200 width=40) (actual time=993.660..993.673 rows=156 loops=3)
Sort Key: (unnest(tab0_1."f_7d1_ff35_t_labels$textual"))
Sort Method: quicksort Memory: 36kB
Worker 0: Sort Method: quicksort Memory: 36kB
Worker 1: Sort Method: quicksort Memory: 36kB
-> Partial HashAggregate (cost=101026.12..101028.12 rows=200 width=40) (actual time=992.773..992.805 rows=156 loops=3)
Group Key: (unnest(tab0_1."f_7d1_ff35_t_labels$textual"))
-> Parallel Hash Join (cost=38198.95..100793.91 rows=46442 width=32) (actual time=162.253..803.286 rows=372815 loops=3)
Hash Cond: (tab0_1._id = tab0._id)
-> ProjectSet (cost=0.00..61187.74 rows=4644200 width=36) (actual time=0.135..448.785 rows=372815 loops=3)
-> Parallel Seq Scan on stream_616d2986bd17682e66aedbb3_102096 tab0_1 (cost=0.00..37618.43 rows=46442 width=36) (actual time=0.024..121.193 rows=49965 loops=3)
-> Parallel Hash (cost=37618.43..37618.43 rows=46442 width=4) (actual time=151.849..151.849 rows=49965 loops=3)
Buckets: 262144 (originally 131072) Batches: 1 (originally 1) Memory Usage: 8960kB
-> Parallel Seq Scan on stream_616d2986bd17682e66aedbb3_102096 tab0 (cost=0.00..37618.43 rows=46442 width=4) (actual time=0.034..116.506 rows=49965 loops=3)
Planning Time: 1.457 ms
Execution Time: 1006.036 ms
Planning Time: 1.402 ms
Execution Time: 1038.842 ms
(36 rows)
Native queries
db_61b061f8889c8eb9da8fe0d5=# explain analyse
SELECT
"col0" AS diffcol0,
count('*') AS count
FROM
"stream_616d2986bd17682e66aedbb3" AS tab0,
(
SELECT
tab0._id,
UNNEST("f_7d1_ff35_t_labels$textual") AS "col0"
FROM
"stream_616d2986bd17682e66aedbb3" as tab0
) AS tab1
WHERE
("tab1"."_id" = "tab0"."_id")
GROUP BY
"diffcol0"
ORDER BY
"count" DESC NULLS LAST
LIMIT
50 OFFSET 0;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=671257.48..671257.60 rows=50 width=40) (actual time=3920.311..3991.489 rows=50 loops=1)
-> Sort (cost=671257.48..671257.98 rows=200 width=40) (actual time=3920.310..3920.312 rows=50 loops=1)
Sort Key: (count('*')) DESC NULLS LAST
Sort Method: top-N heapsort Memory: 31kB
-> Finalize GroupAggregate (cost=671200.16..671250.83 rows=200 width=40) (actual time=3919.948..3920.268 rows=163 loops=1)
Group Key: tab1.col0
-> Gather Merge (cost=671200.16..671246.83 rows=400 width=40) (actual time=3919.939..3990.818 rows=480 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Sort (cost=670200.14..670200.64 rows=200 width=40) (actual time=3914.206..3914.215 rows=160 loops=3)
Sort Key: tab1.col0
Sort Method: quicksort Memory: 36kB
Worker 0: Sort Method: quicksort Memory: 36kB
Worker 1: Sort Method: quicksort Memory: 36kB
-> Partial HashAggregate (cost=670190.49..670192.49 rows=200 width=40) (actual time=3913.414..3913.436 rows=160 loops=3)
Group Key: tab1.col0
-> Parallel Hash Join (cost=228747.38..668317.52 rows=374595 width=32) (actual time=2950.412..3465.694 rows=2225782 loops=3)
Hash Cond: (tab1._id = tab0._id)
-> Subquery Scan on tab1 (cost=0.00..421698.18 rows=899027 width=36) (actual time=0.051..1927.125 rows=2225782 loops=3)
-> ProjectSet (cost=0.00..412707.91 rows=37459500 width=36) (actualtime=0.048..1614.679 rows=2225782 loops=3)
-> Parallel Seq Scan on stream_616d2986bd17682e66aedbb3 tab0_1 (cost=0.00..222600.95 rows=374595 width=91) (actual time=0.033..476.560 rows=299678 loops=3)
-> Parallel Hash (cost=222600.95..222600.95 rows=374595 width=4) (actual time=527.937..527.938 rows=299678 loops=3)
Buckets: 131072 Batches: 16 Memory Usage: 3264kB
-> Parallel Seq Scan on stream_616d2986bd17682e66aedbb3 tab0 (cost=0.00..222600.95 rows=374595 width=4) (actual time=0.040..455.306 rows=299678 loops=3)
Planning Time: 0.390 ms
Execution Time: 3991.600 ms
(26 rows)