0
点赞
收藏
分享

微信扫一扫

Citus Research Reports

zibianqu 2022-03-30 阅读 37

Prepare

The hardware configuration

server *3

cpu:4core memory:16G disk: hdd 300g

software configuration

Citus Plugin For Pgsql - Quickstart

test data

stream_616d2986bd17682e66aedbb3 899035

stream_617fc79a355b7f54684a9c8b 2693935

Sql

stream_616d2986bd17682e66aedbb3
default
explain analyse
SELECT
  "col0" AS diffcol0,
  count('*') AS count
FROM
  "stream_616d2986bd17682e66aedbb3" AS tab0,
  (
    SELECT
      tab0._id,
      UNNEST("f_7d1_ff35_t_labels$textual") AS "col0"
    FROM
      "stream_616d2986bd17682e66aedbb3" as tab0
  ) AS tab1
WHERE
  ("tab1"."_id" = "tab0"."_id")
GROUP BY
  "diffcol0"
ORDER BY
  "count" DESC NULLS LAST
LIMIT
  50 OFFSET 0;
ngram
explain analyse
SELECT
  "col0" AS diffcol0,
  count('*') AS count
FROM
  "stream_616d2986bd17682e66aedbb3" AS tab0,
  (
    SELECT
      tab0._id,
      c_unnest_taxonomy_with_null("f_7d1_ff35_t_labels$textual") AS "col0"
    FROM
      "stream_616d2986bd17682e66aedbb3" as tab0
  ) AS tab1
WHERE
  (
    (
      "f_n61_af35_ngrams$textual" && (ARRAY ['日期_新鲜','值得_信赖']::text [])
    )
    AND "tab1"."_id" = "tab0"."_id"
  )
GROUP BY
  "diffcol0"
ORDER BY
  "diffcol0" ASC NULLS FIRST
LIMIT
  200 OFFSET 0;
stream_617fc79a355b7f54684a9c8b
default
explain analyse
SELECT
  "col0" AS diffcol0,
  count('*') AS count
FROM
  "stream_617fc79a355b7f54684a9c8b" AS tab0,
  (
    SELECT
      tab0._id,
      UNNEST("f_jq2_20d7_t_labels$textual") AS "col0"
    FROM
      "stream_617fc79a355b7f54684a9c8b" as tab0
  ) AS tab1
WHERE
  ("tab1"."_id" = "tab0"."_id")
GROUP BY
  "diffcol0"
ORDER BY
  "count" DESC NULLS LAST
LIMIT
  50 OFFSET 0;
ngram
explain analyse
SELECT
  "col0" AS diffcol0,
  count('*') AS count
FROM
  "stream_617fc79a355b7f54684a9c8b" AS tab0,
  (
    SELECT
      tab0._id,
      c_unnest_taxonomy_with_null("f_jq2_20d7_t_labels$textual") AS "col0"
    FROM
      "stream_617fc79a355b7f54684a9c8b" as tab0
  ) AS tab1
WHERE
  (
    (
      "f_fq2_7de8_ngrams$textual" && (ARRAY ['日期_新鲜','值得_信赖']::text [])
    )
    AND "tab1"."_id" = "tab0"."_id"
  )
GROUP BY
  "diffcol0"
ORDER BY
  "diffcol0" ASC NULLS FIRST
LIMIT
  200 OFFSET 0;

testing

Hash mode

citus configure

set citus.shard_count=8;

You can set the number of global shards for the current database.

query speed results

Cluster mode report ( no backup)

set citus.shard_replication_factor=1;

0.9mclustershard4shard8shard16shard32
default3940 ms1833 ms1713 ms1558 ms (39.54%)1522 ms (38.62%)
ngram5406 ms1714 ms1152 ms893 ms (16.51%)974 ms (18.01%)
2.7mclustershard4shard8shard16shard32
default5719 ms2763 ms3406 ms2750 ms (48.08%)2881 ms (50.37%)
ngram9395 ms2757 ms2214 ms1843 ms (19.61%)1772 ms (18.86%)

Cluster mode report (backup)

set citus.shard_replication_factor=2;

0.9mclustershard 4shard 8shard 16shard 32
default3940 ms2020 ms1910 ms1426 ms (36.19%)1327 ms (33.68%)
ngram5406 ms1617 ms863 ms920 ms(17.01%)996 ms(18.42%)
2.7mclustershard 4shard 8shard 16shard 32
default5719 ms2921 ms3126 ms2384 ms (41.68%)2533 ms (44.29%)
ngram9395 ms2769 ms2090 ms1710 ms (18.20%)1794 ms (19.09)

Cluster mode report(no backup + backup)

0.9mno backup (default)backup (default)no backup(ngram)backup (ngram)
shard 41833 ms2020 ms1671 ms1617 ms
shard 81713 ms1910 ms1152 ms863 ms
shard 161558 ms1426 ms893 ms920 ms
shard 321522 ms1327 ms974 ms996 ms
2.7mno backup (default)backup (default)no backup(ngram)backup (ngram)
shard 42763 ms2921 ms2757 ms2769 ms
shard 83406 ms3126 ms2214 ms2090 ms
shard 162750 ms2384 ms1843 ms1710 ms
shard 322881 ms2533 ms1772 ms1794 ms
Stand-alone mode report

Standalone does not require testing

conclusion

Based on the table above, we can see that 16 shards work best

cpu_core_number * (2-4) * number of nodes

shard backup test

worker1(shard table id)worker2(shard table id)worker3 (shard table id)
chunk-102087chunk-102088chunk-102087
chunk-102088chunk-102089chunk-102089
chunk-102090chunk-102091chunk-102090
chunk-102091
worker1(shard table id)worker2(shard table id)worker3 (shard table id)
chunk-102092chunk-102093chunk-102092
chunk-102093chunk-102094chunk-102094
chunk-102095chunk-102096chunk-102095
chunk-102096chunk-102097chunk-102097
Citus shardbackup failover test
QUERY PLAN

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=510.14..510.27 rows=50 width=40) (actual time=1043.209..1043.220 rows=50 loops=1)
   ->  Sort  (cost=510.14..510.64 rows=200 width=40) (actual time=1043.207..1043.210 rows=50 loops=1)
         Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC NULLS LAST
         Sort Method: top-N heapsort  Memory: 30kB
         ->  HashAggregate  (cost=500.00..503.50 rows=200 width=40) (actual time=1043.111..1043.158 rows=163 loops=1)
               Group Key: remote_scan.diffcol0
               ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=40) (actual time=1042.721..1042.791 rows=954 loops=1)
                     Task Count: 6
                     Tuple data received from nodes: 13 kB
                     Tasks Shown: One of 6
                     ->  Task
                           Tuple data received from node: 2171 bytes
                           Node: host=10.163.16.167 port=5434 dbname=db_61b061f8889c8eb9da8fe0d5
                           ->  Finalize GroupAggregate  (cost=101649.93..101700.60 rows=200 width=40) (actual time=1022.072..1022.481 rows=157 loops=1)
                           .......

 Planning Time: 3.273 ms
 Execution Time: 1043.295 ms
QUERY PLAN

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------
 Limit  (cost=510.14..510.27 rows=50 width=40) (actual time=1879.004..1879.014 rows=50 loops=1)
   ->  Sort  (cost=510.14..510.64 rows=200 width=40) (actual time=1879.001..1879.004 rows=50 loops=1)
         Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC NULLS LAST
         Sort Method: top-N heapsort  Memory: 30kB
         ->  HashAggregate  (cost=500.00..503.50 rows=200 width=40) (actual time=1878.908..1878.955 rows=163 loops=1)
               Group Key: remote_scan.diffcol0
               ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=40) (actual time=1878.567..1878.629 rows=954 loops=1)
                     Task Count: 6
                     Tuple data received from nodes: 13 kB
                     Tasks Shown: One of 6
                     ->  Task
                           Tuple data received from node: 2179 bytes
                           Node: host=10.163.21.120 port=5434 dbname=db_61b061f8889c8eb9da8fe0d5
                           ->  Finalize GroupAggregate  (cost=111112.90..111163.57 rows=200 width=40) (actual time=1803.301..1803.372 rows=158 loops=1)
                           ........

 Planning Time: 1.486 ms
 Execution Time: 1879.072 ms
(34 rows)

After the worker node is closed manually, a warning will be displayed indicating that the query speed is not guaranteed, but data integrity is guaranteed.

Manually shut down two Citus worker nodes at random, the query is likely to fail, and the query is likely to succeed.

rebalance shards test
db_61b061f8889c8eb9da8fe0d5=# /*NO LOAD BLANCE*/ SELECT rebalance_table_shards('stream_616d2986bd17682e66aedbb3');
ERROR:  rebalance_table_shards() is only supported on Citus Enterprise

Disk usage test

0.9mtotalshard 4shard8
no backup2094MB524 + 524 + 522 + 524 (2094MB)263+262+262+261+262+263+262+262 (2097MB)
backup2094MB523+524+523+524+524+524+523+523 (4188MB)262+262+263+262+262+262+261+262+262+263+262+262+262+261+263+263 (4194MB)
conclusion

According to the above table, several fragments can be created and evenly distributed to several worker nodes. Using backup will double the disk usage of worker nodes.

table_total_size / number of nodes * citus.shard_replication_factor

append mode

citus configure

select create_distributed_table('stream_616d2986bd17682e66aedbb3','_id', 'hash')

set citus.shard_max_size="100MB";

Set the size of each shard and create a new shard.

create distributed tables

test_append=# select create_distributed_table('stream_616d2986bd17682e66aedbb3','_id', 'append'); 
WARNING:  table "stream_616d2986bd17682e66aedbb3" has a UNIQUE or EXCLUDE constraint 
DETAIL:  UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced. 
HINT:  Consider using hash partitioning.  
create_distributed_table 
--------------------------
Execute SQL:
ERROR: cannot push down this subquery
DETAIL: Currently append partitioned relations with overlapping shard intervals are not supported

Description The query failed because primary_key was used as the distributed column

test again:

test_append=# select create_distributed_table('stream_616d2986bd17682e66aedbb3','f_vc_dbb3_Date$temporal', 'append');
WARNING:  table "stream_616d2986bd17682e66aedbb3" has a UNIQUE or EXCLUDE constraint
描述:  UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced.
提示:  Consider using hash partitioning.
ERROR:  cannot create constraint on "stream_616d2986bd17682e66aedbb3"
描述:  Distributed relations cannot have UNIQUE, EXCLUDE, or PRIMARY KEY constraints that do not include the partition column (with an equality operator if EXCLUDE).

Citus with PGSQL one master and two slave

Architecture diagram

在这里插入图片描述

The implementation process

  1. Install the Citus plug-in in the existing cluster,adding a configuration and restart

  2. Add citus worker node to master node.

    /*NO LOAD BLANCE*/ SELECT * from master_add_node('worker1_ip', 5432);

  3. Create a distributed table

    /*NO LOAD BLANCE*/ select create_distributed_table('stream_616d2986bd17682e66aedbb3','_id', 'hash');

  4. Validation

    SELECT * FROM master_get_active_worker_nodes();

SQL validation
Use citus to query
db_61b061f8889c8eb9da8fe0d5=# explain analyse
SELECT
  "col0" AS diffcol0,
  count('*') AS count
FROM
  "stream_616d2986bd17682e66aedbb3" AS tab0,
  (
    SELECT
      tab0._id,
      UNNEST("f_7d1_ff35_t_labels$textual") AS "col0"
    FROM
      "stream_616d2986bd17682e66aedbb3" as tab0
  ) AS tab1
WHERE
  ("tab1"."_id" = "tab0"."_id")
GROUP BY
  "diffcol0"
ORDER BY
  "count" DESC NULLS LAST
LIMIT
  50 OFFSET 0;
                                                                                                           QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=510.14..510.27 rows=50 width=40) (actual time=1038.773..1038.784 rows=50 loops=1)
   ->  Sort  (cost=510.14..510.64 rows=200 width=40) (actual time=1038.772..1038.774 rows=50 loops=1)
         Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC NULLS LAST
         Sort Method: top-N heapsort  Memory: 30kB
         ->  HashAggregate  (cost=500.00..503.50 rows=200 width=40) (actual time=1038.682..1038.728 rows=163 loops=1)
               Group Key: remote_scan.diffcol0
               ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=40) (actual time=1038.332..1038.415 rows=954 loops=1)
                     Task Count: 6
                     Tuple data received from nodes: 13 kB
                     Tasks Shown: One of 6
                     ->  Task
                           Tuple data received from node: 2179 bytes
                           Node: host=10.163.16.167 port=5434 dbname=db_61b061f8889c8eb9da8fe0d5
                           ->  Finalize GroupAggregate  (cost=102035.79..102086.46 rows=200 width=40) (actual time=1001.646..1001.941 rows=158 loops=1)
                                 Group Key: (unnest(tab0_1."f_7d1_ff35_t_labels$textual"))
                                 ->  Gather Merge  (cost=102035.79..102082.46 rows=400 width=40) (actual time=1001.600..1005.658 rows=467 loops=1)
                                       Workers Planned: 2
                                       Workers Launched: 2
                                       ->  Sort  (cost=101035.76..101036.26 rows=200 width=40) (actual time=993.660..993.673 rows=156 loops=3)
                                             Sort Key: (unnest(tab0_1."f_7d1_ff35_t_labels$textual"))
                                             Sort Method: quicksort  Memory: 36kB
                                             Worker 0:  Sort Method: quicksort  Memory: 36kB
                                             Worker 1:  Sort Method: quicksort  Memory: 36kB
                                             ->  Partial HashAggregate  (cost=101026.12..101028.12 rows=200 width=40) (actual time=992.773..992.805 rows=156 loops=3)
                                                   Group Key: (unnest(tab0_1."f_7d1_ff35_t_labels$textual"))
                                                   ->  Parallel Hash Join  (cost=38198.95..100793.91 rows=46442 width=32) (actual time=162.253..803.286 rows=372815 loops=3)
                                                         Hash Cond: (tab0_1._id = tab0._id)
                                                         ->  ProjectSet  (cost=0.00..61187.74 rows=4644200 width=36) (actual time=0.135..448.785 rows=372815 loops=3)
                                                               ->  Parallel Seq Scan on stream_616d2986bd17682e66aedbb3_102096 tab0_1  (cost=0.00..37618.43 rows=46442 width=36) (actual time=0.024..121.193 rows=49965 loops=3)
                                                         ->  Parallel Hash  (cost=37618.43..37618.43 rows=46442 width=4) (actual time=151.849..151.849 rows=49965 loops=3)
                                                               Buckets: 262144 (originally 131072)  Batches: 1 (originally 1)  Memory Usage: 8960kB
                                                               ->  Parallel Seq Scan on stream_616d2986bd17682e66aedbb3_102096 tab0  (cost=0.00..37618.43 rows=46442 width=4) (actual time=0.034..116.506 rows=49965 loops=3)
                               Planning Time: 1.457 ms
                               Execution Time: 1006.036 ms
 Planning Time: 1.402 ms
 Execution Time: 1038.842 ms
(36 rows)
Native queries
db_61b061f8889c8eb9da8fe0d5=# explain analyse
SELECT
  "col0" AS diffcol0,
  count('*') AS count
FROM
  "stream_616d2986bd17682e66aedbb3" AS tab0,
  (
    SELECT
      tab0._id,
      UNNEST("f_7d1_ff35_t_labels$textual") AS "col0"
    FROM
      "stream_616d2986bd17682e66aedbb3" as tab0
  ) AS tab1
WHERE
  ("tab1"."_id" = "tab0"."_id")
GROUP BY
  "diffcol0"
ORDER BY
  "count" DESC NULLS LAST
LIMIT
  50 OFFSET 0;
                                                                                                   QUERY PLAN

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=671257.48..671257.60 rows=50 width=40) (actual time=3920.311..3991.489 rows=50 loops=1)
   ->  Sort  (cost=671257.48..671257.98 rows=200 width=40) (actual time=3920.310..3920.312 rows=50 loops=1)
         Sort Key: (count('*')) DESC NULLS LAST
         Sort Method: top-N heapsort  Memory: 31kB
         ->  Finalize GroupAggregate  (cost=671200.16..671250.83 rows=200 width=40) (actual time=3919.948..3920.268 rows=163 loops=1)
               Group Key: tab1.col0
               ->  Gather Merge  (cost=671200.16..671246.83 rows=400 width=40) (actual time=3919.939..3990.818 rows=480 loops=1)
                     Workers Planned: 2
                     Workers Launched: 2
                     ->  Sort  (cost=670200.14..670200.64 rows=200 width=40) (actual time=3914.206..3914.215 rows=160 loops=3)
                           Sort Key: tab1.col0
                           Sort Method: quicksort  Memory: 36kB
                           Worker 0:  Sort Method: quicksort  Memory: 36kB
                           Worker 1:  Sort Method: quicksort  Memory: 36kB
                           ->  Partial HashAggregate  (cost=670190.49..670192.49 rows=200 width=40) (actual time=3913.414..3913.436 rows=160 loops=3)
                                 Group Key: tab1.col0
                                 ->  Parallel Hash Join  (cost=228747.38..668317.52 rows=374595 width=32) (actual time=2950.412..3465.694 rows=2225782 loops=3)
                                       Hash Cond: (tab1._id = tab0._id)
                                       ->  Subquery Scan on tab1  (cost=0.00..421698.18 rows=899027 width=36) (actual time=0.051..1927.125 rows=2225782 loops=3)
                                             ->  ProjectSet  (cost=0.00..412707.91 rows=37459500 width=36) (actualtime=0.048..1614.679 rows=2225782 loops=3)
                                                   ->  Parallel Seq Scan on stream_616d2986bd17682e66aedbb3 tab0_1 (cost=0.00..222600.95 rows=374595 width=91) (actual time=0.033..476.560 rows=299678 loops=3)
                                       ->  Parallel Hash  (cost=222600.95..222600.95 rows=374595 width=4) (actual time=527.937..527.938 rows=299678 loops=3)
                                             Buckets: 131072  Batches: 16  Memory Usage: 3264kB
                                             ->  Parallel Seq Scan on stream_616d2986bd17682e66aedbb3 tab0  (cost=0.00..222600.95 rows=374595 width=4) (actual time=0.040..455.306 rows=299678 loops=3)
 Planning Time: 0.390 ms
 Execution Time: 3991.600 ms
(26 rows)
举报

相关推荐

0 条评论