0
点赞
收藏
分享

微信扫一扫

RocketMq IndexService介绍

菜头粿子园 2021-09-28 阅读 86

系列

  • RocketMq broker 配置文件
  • RocketMq broker 启动流程
  • RocketMq broker CommitLog介绍
  • RocketMq broker consumeQueue介绍
  • RocketMq broker 重试和死信队列
  • RocketMq broker 延迟消息
  • RocketMq IndexService介绍
  • RocketMq 读写分离机制
  • RocketMq broker过期文件删除

开篇

  • 这个系列的主要目的是介绍RocketMq broker的原理和用法,在这个系列当中会介绍 broker 配置文件、broker 启动流程、broker延迟消息、broker消息存储。

  • 这篇文章主要介绍broker IndexService,主要介绍IndexService的数据结构和对应的建索引过程。

IndexFile 介绍

  • IndexFile文件的存储位置是:\store\index${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W4+2000W20= 420000040个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

  • 其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。202000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。

  • IndexFile在解决hash冲突的过程中会采用头插法,即所有的冲突数据都往链表的头部进行插入,然后每个新添加的元素都会包含后一个元素的位置,hash对应的slot Table会指向第一个索引元素。在实际元素存储的数据的顺序和查询的顺序是逆向映射的,这点需要理解


IndexFile创建

public class IndexService {

    private static final int MAX_TRY_IDX_CREATE = 3;
    private final DefaultMessageStore defaultMessageStore;
    private final int hashSlotNum;
    private final int indexNum;
    private final String storePath;
    private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public IndexService(final DefaultMessageStore store) {
        this.defaultMessageStore = store;
        // private int maxHashSlotNum = 5000000;
        // private int maxIndexNum = 5000000 * 4;
        this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
        this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
        this.storePath =
            StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
    }

    public IndexFile getAndCreateLastIndexFile() {
        IndexFile indexFile = null;
        IndexFile prevIndexFile = null;
        long lastUpdateEndPhyOffset = 0;
        long lastUpdateIndexTimestamp = 0;

        {
            this.readWriteLock.readLock().lock();
            if (!this.indexFileList.isEmpty()) {
                IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
                if (!tmp.isWriteFull()) {
                    indexFile = tmp;
                } else {
                    lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
                    lastUpdateIndexTimestamp = tmp.getEndTimestamp();
                    prevIndexFile = tmp;
                }
            }

            this.readWriteLock.readLock().unlock();
        }

        if (indexFile == null) {
            try {
                // 创建的文件名以时间戳作为文件名
                String fileName =
                    this.storePath + File.separator
                        + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
                indexFile =
                    // 文件的hashSlotNum=5000000,indexNum=5000000 * 4
                    new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                        lastUpdateIndexTimestamp);
                this.readWriteLock.writeLock().lock();
                this.indexFileList.add(indexFile);
            } catch (Exception e) {
                log.error("getLastIndexFile exception ", e);
            } finally {
                this.readWriteLock.writeLock().unlock();
            }

            if (indexFile != null) {
                // 前置文件刷盘
                final IndexFile flushThisFile = prevIndexFile;
                Thread flushThread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        IndexService.this.flush(flushThisFile);
                    }
                }, "FlushIndexFileThread");

                flushThread.setDaemon(true);
                flushThread.start();
            }
        }

        return indexFile;
    }
}
  • IndexService在不存在或者当前文件已满的情况下会创建新的indexFile文件呢。
  • indexFile文件的名为当前时间戳、hashSlotNum=5000000,indexNum=5000000 * 4。


Index存储

public class IndexFile {

    private static int hashSlotSize = 4;
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    private final int hashSlotNum;
    private final int indexNum;

    private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");

            indexFile = retryGetAndCreateIndexFile();
            if (null == indexFile) {
                return null;
            }

            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
        }

        return indexFile;
    }


    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        // 在Index的文件没有满的情况下放置索引数据
        if (this.indexHeader.getIndexCount() < this.indexNum) {

            // 1、针对key计算hash值
            int keyHash = indexKeyHashMethod(key);
            // 2、记录hash值应该保存的slot的位置
            int slotPos = keyHash % this.hashSlotNum;
            // 3、计算Index文件当中slotPos对应的实际物理偏移
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;

            try {
                // 获取absSlotPos位置记录当前存储的index的位移
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                // 4、计算index实际的存储的偏移
                // 实际位置等于headIndex + slot占用的位置(slotSize * slotNum) + 已有的Index的偏移(已有index的个数*indexSize)
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                // 5、生成Index的对象放置 keyHash、phyOffset、timeDiff、slotValue(保存的该hash值下前一个index的逻辑位移,也就是第几个index对象)、

                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                // 6、记录SlotPos的当前的index的个数,即逻辑位移。
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
                // 设置Index文件整体的index个数和slot个数、commitLog的最后物理偏移和最新的存储时间戳。
                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }
}
  • IndexFile#putKey实现了整个index文件存储过程,由于IndexFile实现的是类似hash的结果,所以存储过程也跟hash的存储流程比较相似。

  • 1、针对key计算hash值,记录hash值应该保存的slot的位置,计算Index文件当中slotPos对应的实际物理偏移。

  • 2、根据slotPos对应的实际物理偏移获取该slot下最新的index文件的逻辑位移,即index linked list的第几个。

  • 3、计算index实际的存储的偏移,实际位置等于headIndex + slot占用的位置(slotSize * slotNum) + 已有的Index的物理偏移(已有index的个数*indexSize)。

  • 4、生成当前Index的对象放置 keyHash、phyOffset(commitLog的实际偏移量)、timeDiff、slotValue(保存的该hash值下前一个index的逻辑位移,也就是第几个index对象),slotValue起到了链表链接的作用。

  • 5、设置Index文件整体的index个数和slot个数、commitLog的最后物理偏移和最新的存储时间戳。

举报

相关推荐

0 条评论