一、分片与副本设置
1、分片(shard)
Elasticsearch集群允许系统存储的数据量超过单机容量,实现这一目标引入分片策略shard。在一个索引index中,数据(document)被分片处理(sharding)到多个分片上。Elasticsearch屏蔽了管理分片的复杂性,使得多个分片呈现出一个大索引的样子。
2、副本(replica)
为了提升访问压力过大是单机无法处理所有请求的问题,Elasticsearch集群引入了副本策略replica。副本策略对index中的每个分片创建冗余的副本,处理查询时可以把这些副本当做主分片来对待(primary shard),此外副本策略提供了高可用和数据安全的保障,当分片所在的机器宕机,Elasticsearch可以使用其副本进行恢复,从而避免数据丢失。
3、设置
number_of_shards
每个索引的主分片数,默认值是 5
。这个配置在索引创建后不能修改。
number_of_replicas
每个主分片的副本数,默认值是 1
。对于活动的索引库,这个配置可以随时修改。
例如,我们可以创建只有 一个主分片,没有副本的小索引(只对某个索引有效):
PUT /my_temp_index
{
"settings": {
"number_of_shards" : 1,
"number_of_replicas" : 0
}
}
然后,我们可以用 update-index-settings
API 动态修改副本数:
PUT /my_temp_index/_settings
{
"number_of_replicas": 1
}
默认分片数为5、副本为1:
分片与副本
4、查看索引分片与副本信息
(1)查看所有索引分片与副本信息:http://localhost:9200/_settings
返回所有索引的信息
{
"index2": {
"settings": {
"index": {
"creation_date": "1547693736615",
"number_of_shards": "5",
"number_of_replicas": "1",
"uuid": "-jpUoiQKQYCIL_vc9RWTpA",
"version": {
"created": "6010399"
},
"provided_name": "index2"
}
}
},
"index": {
"settings": {
"index": {
"creation_date": "1547462626951",
"number_of_shards": "5",
"number_of_replicas": "1",
"uuid": "psUlEdttRLGjzxXe81A0wg",
"version": {
"created": "6010399"
},
"provided_name": "index"
}
}
}
}
(2)查看某个索引的分片与副本信息:http://localhost:9200/{索引名称}/_settings
http://localhost:9200/index/_settings 结果如下:
{
"index": {
"settings": {
"index": {
"creation_date": "1547462626951",
"number_of_shards": "5",
"number_of_replicas": "1",
"uuid": "psUlEdttRLGjzxXe81A0wg",
"version": {
"created": "6010399"
},
"provided_name": "index"
}
}
}
}
二、分片算法
Elasticsearch默认的分片个数为5,对于同一个索引的不同id的数据持久化需要进行统一的分片处理,简单来说分片算法就是计算id的hash值,然后与分片数进行floorMod运算(类似取余)。
OperationRouting的generateShardId方法:
public static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
if (routing == null) {
assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}
if (indexMetaData.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}
private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
//计算hash值
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
//类似取余运行获取分片结果
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}
三、节点分片信息
集群中三个节点的分片:
标题
节点分片信息
标题
请求转发:
TransportReplicationAction中会根据索引的分片结果找到分片所在的节点,如果在本地则直接进行索引操作,如果在其他节点则将请求转发到其他节点中。
ReroutePhase的doRun方法:
@Override
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
if (handleBlockExceptions(state)) {
return;
}
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final String concreteIndex = concreteIndex(state);
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}
// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
//找到节点路由表
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
//根据分片id找到节点信息
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
//本地节点则本地执行
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
//远程节点则发送请求到远程
performRemoteAction(state, primary, node);
}
}
参考文章
elasticsearch shard split 思考