0
点赞
收藏
分享

微信扫一扫

Elasticsearch原理学习--分片原理及分片路由转发


一、分片与副本设置

1、分片(shard)

Elasticsearch集群允许系统存储的数据量超过单机容量,实现这一目标引入分片策略shard。在一个索引index中,数据(document)被分片处理(sharding)到多个分片上。Elasticsearch屏蔽了管理分片的复杂性,使得多个分片呈现出一个大索引的样子。

2、副本(replica)

为了提升访问压力过大是单机无法处理所有请求的问题,Elasticsearch集群引入了副本策略replica。副本策略对index中的每个分片创建冗余的副本,处理查询时可以把这些副本当做主分片来对待(primary shard),此外副本策略提供了高可用和数据安全的保障,当分片所在的机器宕机,Elasticsearch可以使用其副本进行恢复,从而避免数据丢失。

3、设置

​number_of_shards​

每个索引的主分片数,默认值是 ​​5​​ 。这个配置在索引创建后不能修改。

​number_of_replicas​

每个主分片的副本数,默认值是 ​​1​​ 。对于活动的索引库,这个配置可以随时修改。

例如,我们可以创建只有 一个主分片,没有副本的小索引(只对某个索引有效):

PUT /my_temp_index
{
"settings": {
"number_of_shards" : 1,
"number_of_replicas" : 0
}
}

然后,我们可以用 ​​update-index-settings​​ API 动态修改副本数:

PUT /my_temp_index/_settings
{
"number_of_replicas": 1
}

默认分片数为5、副本为1: 


Elasticsearch原理学习--分片原理及分片路由转发_elasticsearch

分片与副本

4、查看索引分片与副本信息

(1)查看所有索引分片与副本信息:http://localhost:9200/_settings

返回所有索引的信息

{
"index2": {
"settings": {
"index": {
"creation_date": "1547693736615",
"number_of_shards": "5",
"number_of_replicas": "1",
"uuid": "-jpUoiQKQYCIL_vc9RWTpA",
"version": {
"created": "6010399"
},
"provided_name": "index2"
}
}
},
"index": {
"settings": {
"index": {
"creation_date": "1547462626951",
"number_of_shards": "5",
"number_of_replicas": "1",
"uuid": "psUlEdttRLGjzxXe81A0wg",
"version": {
"created": "6010399"
},
"provided_name": "index"
}
}
}
}

(2)查看某个索引的分片与副本信息:http://localhost:9200/{索引名称}/_settings

http://localhost:9200/index/_settings 结果如下:

{
"index": {
"settings": {
"index": {
"creation_date": "1547462626951",
"number_of_shards": "5",
"number_of_replicas": "1",
"uuid": "psUlEdttRLGjzxXe81A0wg",
"version": {
"created": "6010399"
},
"provided_name": "index"
}
}
}
}

二、分片算法

Elasticsearch默认的分片个数为5,对于同一个索引的不同id的数据持久化需要进行统一的分片处理,简单来说分片算法就是计算id的hash值,然后与分片数进行floorMod运算(类似取余)。

OperationRouting的generateShardId方法:

public static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;

if (routing == null) {
assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}

if (indexMetaData.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}

return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}

private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
//计算hash值
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
//类似取余运行获取分片结果
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}

三、节点分片信息

集群中三个节点的分片:


Elasticsearch原理学习--分片原理及分片路由转发_请求转发_02

标题

 节点分片信息


Elasticsearch原理学习--分片原理及分片路由转发_ide_03

标题

请求转发:

      TransportReplicationAction中会根据索引的分片结果找到分片所在的节点,如果在本地则直接进行索引操作,如果在其他节点则将请求转发到其他节点中。

ReroutePhase的doRun方法:

@Override
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
if (handleBlockExceptions(state)) {
return;
}

// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final String concreteIndex = concreteIndex(state);
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}

// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";

//找到节点路由表
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
//根据分片id找到节点信息
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
//本地节点则本地执行
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
//远程节点则发送请求到远程
performRemoteAction(state, primary, node);
}
}

参考文章

​​elasticsearch shard split 思考​​

举报

相关推荐

0 条评论